Source code for lib.sedna.algorithms.unseen_task_detection.unseen_sample_recognition.unseen_sample_detection

# Copyright 2023 The KubeEdge Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import time
import json
import requests
import threading

from sedna.common.file_ops import FileOps
from sedna.common.class_factory import ClassFactory, ClassType

__all__ = ('UnseenSampleDetection', )


@ClassFactory.register(ClassType.UTD)
[docs]class UnseenSampleDetection(threading.Thread): ''' Divide inference samples into seen samples and unseen samples Parameters ---------- task_index: str or dict knowledge base index which includes indexes of tasks, samples and etc. ''' def __init__(self, edge_knowledge_management, **kwargs): self.get_environ_varia() self.unseen_save_url = edge_knowledge_management.local_unseen_save_url self.check_time = 1 self.current_status = "False" self.current_sample_num = 0 super(UnseenSampleDetection, self).__init__()
[docs] def run(self): while True: self.get_environ_varia() time.sleep(self.check_time) try: check_request = requests.post( url=self.status_service_ip ) status_dict = json.loads(check_request.text) if status_dict["if_fall"] == "False": continue elif status_dict["if_fall"] == "True" and \ self.current_status == "False": self.current_status = "True" samples = os.listdir(self.local_image_url) samples.sort(reverse=True) if len(samples) > 0: start_idx, end_idx = self.get_index( samples, status_dict["time_stamp"]) for sample in samples[start_idx:end_idx]: local_sample_url = FileOps.join_path( self.local_image_url, sample) dest_sample_url = FileOps.join_path( self.unseen_save_url, sample) FileOps.upload(local_sample_url, dest_sample_url, clean=False) else: continue except Exception as e: continue
[docs] def get_index(self, samples, timestamp): time_stamp = int(timestamp) for i in range(len(samples)): if int(samples[i].split(".")[0]) <= time_stamp: start_idx = i end_idx = min(i + 10, len(samples) - 1) return start_idx, end_idx else: continue return 0, len(samples) - 1
[docs] def get_environ_varia(self): try: self.status_service_ip = os.environ["STATUS_IP"] except Exception: self.status_service_ip = "127.0.0.1" self.query_url = "http://" + self.status_service_ip + \ ":8000/robot_status/query/" try: self.local_image_url = os.environ["IMAGE_TOPIC_URL"] except Exception: self.local_image_url = "/tmp/"