# Copyright 2021 The KubeEdge Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from sedna.common.log import LOGGER
from sedna.core.multi_edge_inference.components \
import BaseService, FileOperations
from sedna.core.multi_edge_inference.plugins \
import PLUGIN, PluggableModel, PluggableNetworkService
from sedna.core.multi_edge_inference.plugins.registered \
import Feature_Extraction_I, ReID_Server
[docs]class ReID(BaseService, FileOperations):
"""
In MultiEdgeInference, the ReID component is deployed in the cloud
and it used to identify a target by compairing its features
with the ones genereated from the Feature Extraction component.
Parameters
----------
consumer_topics : List
Leave empty.
producer_topics : List
Leave empty.
plugins : List
A list of PluggableNetworkService. It can be left empty
as the ReID component is already preconfigured
to connect to the correct network services.
models : List
A list of PluggableModel. In this case we abuse of the term
model as the ReID doesn't really use an AI model but rather
a wrapper for the ReID functions.
timeout: int
It sets a timeout condition to terminate the main fetch loop
after the specified amount of seconds has passed since we
received the last frame.
asynchronous: bool
If True, the AI processing will be decoupled from the data
acquisition step. If False, the processing will be sequential.
In general, set it to True when ingesting a stream (e.g., RTSP)
and to False when reading from disk (e.g., a video file).
Examples
--------
model = ReIDWorker() # A class implementing the PluggableModel abstract
class (example in pedestrian_tracking/reid/worker.py)
self.job = ReID(models=[model], asynchronous=False)
Notes
-----
For the parameters described above, only 'models' has to be defined, while
for others the default value will work in most cases.
"""
def __init__(
self,
consumer_topics=[],
producer_topics=[],
plugins: List[PluggableNetworkService] = [],
models: List[PluggableModel] = [],
timeout=10,
asynchronous=True
):
self.models = models
merged_plugins = \
[ReID_Server(wrapper=self), Feature_Extraction_I()] + plugins
super().__init__(
consumer_topics,
producer_topics,
merged_plugins,
models,
timeout,
asynchronous)
def _post_init(self):
super()._post_init()
self.update_operational_mode(None)
[docs] def process_data(self, ai, data, **kwargs):
result = ai.predict(data)
# if result != []:
# self.write_to_disk(result, folder='/data/processed/')
[docs] def update_operational_mode(self, status):
for ai in self.models:
try:
ldata = ai.update_plugin(status)
target_list = self.get_target_features(ldata)
ai.update_target(target_list)
except Exception as ex:
LOGGER.error(
f"Unable to update AI parameters/configuration for \
service {ai.__class__.__name__}. [{ex}]"
)
return
[docs] def get_target_features(self, ldata):
feature_extraction_plugin = \
self.get_plugin(PLUGIN.FEATURE_EXTRACTION_I)
features = \
feature_extraction_plugin.plugin_api.get_target_features(ldata)
return features