# Copyright 2023 The KubeEdge Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import os
from sedna.core.base import JobBase
from sedna.common.file_ops import FileOps
from sedna.common.constant import K8sResourceKind, K8sResourceKindStatus, \
KBResourceConstant
from sedna.common.config import Context, BaseConfig
from sedna.common.class_factory import ClassType, ClassFactory
from sedna.algorithms.seen_task_learning.seen_task_learning \
import SeenTaskLearning
from sedna.algorithms.unseen_task_processing import UnseenTaskProcessing
from sedna.algorithms.unseen_task_detection.unseen_sample_recognition.\
unseen_sample_detection import UnseenSampleDetection
from sedna.service.client import KBClient
from sedna.core.lifelong_learning.knowledge_management.\
cloud_knowledge_management import CloudKnowledgeManagement
from sedna.core.lifelong_learning.knowledge_management.\
edge_knowledge_management import EdgeKnowledgeManagement
[docs]class LifelongLearning(JobBase):
"""
Lifelong Learning (LL) is an advanced machine learning (ML) paradigm that
learns continuously, accumulates the knowledge learned in the past, and
uses/adapts it to help future learning and problem solving.
Sedna provide the related interfaces for application development.
Parameters
----------
estimator : Instance
An instance with the high-level API that greatly simplifies
machine learning programming. Estimators encapsulate training,
evaluation, prediction, and exporting for your model.
unseen_estimator : Instance
An instance with the high-level API that greatly simplifies mechanism
model learning programming. Estimators encapsulate training,
evaluation, prediction, and exporting for your mechanism model.
task_definition : Dict
Divide multiple tasks based on data,
see `task_definition.task_definition` for more detail.
task_relationship_discovery : Dict
Discover relationships between all tasks, see
`task_relationship_discovery.task_relationship_discovery`
for more detail.
task_allocation : Dict
Mining seen tasks of inference sample,
see `task_allocation.task_allocation` for more detail.
task_remodeling : Dict
Remodeling tasks based on their relationships,
see `task_remodeling.task_remodeling` for more detail.
inference_integrate : Dict
Integrate the inference results of all related
tasks, see `inference_integrate.inference_integrate` for more detail.
task_update_decision: Dict
Task update strategy making algorithms,
see 'knowledge_management.task_update_decision.task_update_decision'
for more detail.
unseen_task_allocation: Dict
Mining unseen tasks of inference sample,
see `unseen_task_processing.unseen_task_allocation.
unseen_task_allocation` for more detail.
unseen_sample_recognition: Dict
Dividing inference samples into seen tasks and unseen tasks,
see 'unseen_task_processing.unseen_sample_recognition.
unseen_sample_recognition' for more detail.
unseen_sample_re_recognition: Dict
Dividing unseen training samples into seen tasks and unseen tasks,
see 'unseen_task_processing.unseen_sample_re_recognition.
unseen_sample_re_recognition' for more detail.
Examples
--------
>>> estimator = XGBClassifier(objective="binary:logistic")
>>> unseen_estimator = None
>>> task_definition = {
"method": "TaskDefinitionByDataAttr",
"param": {"attribute": ["season", "city"]}
}
>>> task_relationship_discovery = {
"method": "DefaultTaskRelationDiscover", "param": {}
}
>>> task_mining = {
"method": "TaskMiningByDataAttr",
"param": {"attribute": ["season", "city"]}
}
>>> task_remodeling = None
>>> inference_integrate = {
"method": "DefaultInferenceIntegrate", "param": {}
}
>>> task_update_decision = {
"method": "UpdateStrategyDefault", "param": {}
}
>>> unseen_task_allocation = {
"method": "UnseenTaskAllocationDefault", "param": {}
}
>>> unseen_sample_recognition = {
"method": "SampleRegonitionDefault", "param": {}
}
>>> unseen_sample_re_recognition = {
"method": "SampleReRegonitionDefault", "param": {}
}
>>> ll_jobs = LifelongLearning(
estimator,
unseen_estimator=None,
task_definition=None,
task_relationship_discovery=None,
task_allocation=None,
task_remodeling=None,
inference_integrate=None,
task_update_decision=None,
unseen_task_allocation=None,
unseen_sample_recognition=None,
unseen_sample_re_recognition=None,
)
"""
def __init__(self,
seen_estimator,
unseen_estimator=None,
task_definition=None,
task_relationship_discovery=None,
task_allocation=None,
task_remodeling=None,
inference_integrate=None,
task_update_decision=None,
unseen_task_allocation=None,
unseen_sample_recognition=None,
unseen_sample_re_recognition=None
):
e = SeenTaskLearning(
estimator=seen_estimator,
task_definition=task_definition,
task_relationship_discovery=task_relationship_discovery,
seen_task_allocation=task_allocation,
task_remodeling=task_remodeling,
inference_integrate=inference_integrate
)
u = UnseenTaskProcessing(
unseen_estimator,
unseen_task_allocation=unseen_task_allocation)
self.unseen_sample_re_recognition = unseen_sample_re_recognition or {
"method": "SampleReRegonitionDefault"
}
self.unseen_sample_re_recognition_param = e._parse_param(
self.unseen_sample_re_recognition.get("param", {}))
self.task_update_decision = task_update_decision or {
"method": "UpdateStrategyDefault"
}
self.task_update_decision_param = e._parse_param(
self.task_update_decision.get("param", {})
)
config = dict(
ll_kb_server=Context.get_parameters("KB_SERVER"),
output_url=Context.get_parameters(
"OUTPUT_URL",
"/tmp"),
cloud_output_url=Context.get_parameters(
"OUTPUT_URL",
"/tmp"),
task_index=KBResourceConstant.KB_INDEX_NAME.value)
self.cloud_knowledge_management = \
CloudKnowledgeManagement(
config,
seen_estimator=e,
unseen_estimator=u)
self.edge_knowledge_management = \
EdgeKnowledgeManagement(
config,
seen_estimator=e,
unseen_estimator=u)
self.start_inference_service = False
task_index = FileOps.join_path(config['output_url'],
KBResourceConstant.KB_INDEX_NAME.value)
config['task_index'] = task_index
super(LifelongLearning, self).__init__(
estimator=e, config=config
)
self.unseen_sample_recognition = unseen_sample_recognition or {
"method": "SampleRegonitionDefault"
}
self.unseen_sample_recognition_param = e._parse_param(
self.unseen_sample_recognition.get("param", {}))
self.recognize_unseen_samples = None
self.unseen_sample_detection = None
self.job_kind = K8sResourceKind.LIFELONG_JOB.value
self.kb_server = KBClient(kbserver=self.config.ll_kb_server)
[docs] def train(self, train_data,
valid_data=None,
post_process=None,
**kwargs):
"""
fit for update the knowledge based on training data.
Parameters
----------
train_data : BaseDataSource
Train data, see `sedna.datasources.BaseDataSource`
for more detail.
valid_data : BaseDataSource
Valid data, BaseDataSource or None.
post_process : function
function or a registered method, callback after `estimator` train.
kwargs : Dict
parameters for `estimator` training, Like:
`early_stopping_rounds` in Xgboost.XGBClassifier
Returns
-------
train_history : object
"""
is_completed_initilization = \
str(Context.get_parameters("HAS_COMPLETED_INITIAL_TRAINING",
"false")).lower()
if is_completed_initilization == "true":
return self.update(train_data,
valid_data=valid_data,
post_process=post_process,
**kwargs)
callback_func = None
if post_process is not None:
callback_func = ClassFactory.get_cls(
ClassType.CALLBACK, post_process)
res, seen_task_index = \
self.cloud_knowledge_management.seen_estimator.train(
train_data=train_data,
valid_data=valid_data,
**kwargs
) # todo: Distinguishing incremental update and fully overwrite
unseen_res, unseen_task_index = \
self.cloud_knowledge_management.unseen_estimator.train()
task_index = dict(
seen_task=seen_task_index,
unseen_task=unseen_task_index)
task_index_url = FileOps.dump(
task_index, self.cloud_knowledge_management.local_task_index_url)
task_index = self.cloud_knowledge_management.update_kb(task_index_url)
res.update(unseen_res)
task_info_res = \
self.cloud_knowledge_management.seen_estimator.model_info(
task_index,
relpath=self.config.data_path_prefix)
self.report_task_info(
None, K8sResourceKindStatus.COMPLETED.value, task_info_res)
self.log.info(f"Lifelong learning Train task Finished, "
f"KB index save in {task_index}")
return callback_func(self.estimator, res) if callback_func else res
[docs] def update(self, train_data, valid_data=None, post_process=None, **kwargs):
"""
fit for update the knowledge based on incremental data.
Parameters
----------
train_data : BaseDataSource
Train data, see `sedna.datasources.BaseDataSource`
for more detail.
valid_data : BaseDataSource
Valid data, BaseDataSource or None.
post_process : function
function or a registered method, callback after `estimator` train.
kwargs : Dict
parameters for `estimator` training, Like:
`early_stopping_rounds` in Xgboost.XGBClassifier
Returns
-------
train_history : object
"""
callback_func = None
if post_process is not None:
callback_func = ClassFactory.get_cls(
ClassType.CALLBACK, post_process)
task_index_url = Context.get_parameters(
"CLOUD_KB_INDEX", self.cloud_knowledge_management.task_index)
task_index_url = FileOps.join_path(
BaseConfig.data_path_prefix, task_index_url)
index_url = self.cloud_knowledge_management.local_task_index_url
FileOps.download(task_index_url, index_url)
self.log.info(
f"Download last task index from {task_index_url} to {index_url}")
unseen_sample_re_recognition = ClassFactory.get_cls(
ClassType.UTD, self.unseen_sample_re_recognition["method"])(
index_url, **self.unseen_sample_re_recognition_param)
seen_samples, unseen_samples = \
unseen_sample_re_recognition(train_data)
seen_samples.x = np.concatenate(
(seen_samples.x, unseen_samples.x), axis=0)
seen_samples.y = np.concatenate(
(seen_samples.y, unseen_samples.y), axis=0)
task_update_decision = ClassFactory.get_cls(
ClassType.KM, self.task_update_decision["method"])(
index_url, **self.task_update_decision_param)
tasks, task_update_strategies = task_update_decision(
seen_samples, task_type="seen_task")
seen_task_index = \
self.cloud_knowledge_management.seen_estimator.update(
tasks, task_update_strategies, task_index=index_url)
tasks, task_update_strategies = task_update_decision(
unseen_samples, task_type="unseen_task")
unseen_task_index = \
self.cloud_knowledge_management.unseen_estimator.update(
tasks, task_update_strategies, task_index=index_url)
task_index = {
"seen_task": seen_task_index,
"unseen_task": unseen_task_index,
}
task_index = self.cloud_knowledge_management.update_kb(task_index)
task_info_res = \
self.cloud_knowledge_management.seen_estimator.model_info(
task_index,
relpath=self.config.data_path_prefix)
self.report_task_info(
None, K8sResourceKindStatus.COMPLETED.value, task_info_res)
self.log.info(f"Lifelong learning Update task Finished, "
f"KB index save in {task_index}")
return callback_func(self.estimator,
task_index) if callback_func else task_index
[docs] def evaluate(self, data, post_process=None, **kwargs):
"""
evaluated the performance of each task from training, filter tasks
based on the defined rules.
Parameters
----------
data : BaseDataSource
valid data, see `sedna.datasources.BaseDataSource`
for more detail.
kwargs: Dict
parameters for `estimator` evaluate, Like:
`ntree_limit` in Xgboost.XGBClassifier
"""
callback_func = None
if callable(post_process):
callback_func = post_process
elif post_process is not None:
callback_func = ClassFactory.get_cls(
ClassType.CALLBACK, post_process)
task_index_url = Context.get_parameters(
"MODEL_URLS", self.cloud_knowledge_management.task_index)
index_url = self.cloud_knowledge_management.local_task_index_url
if not os.path.exists(index_url):
self.log.info(
f"Download kb index from {task_index_url} to {index_url}")
FileOps.download(task_index_url, index_url)
res, index_file = self._task_evaluation(
data, task_index=index_url, **kwargs)
self.log.info("Task evaluation finishes.")
FileOps.upload(index_file, self.cloud_knowledge_management.task_index)
self.log.info(
f"Upload kb index from {index_file} to \
{self.cloud_knowledge_management.task_index}")
task_info_res = self.estimator.model_info(
self.cloud_knowledge_management.task_index, result=res,
relpath=self.config.data_path_prefix)
self.report_task_info(
None,
K8sResourceKindStatus.COMPLETED.value,
task_info_res,
kind="eval")
return callback_func(res) if callback_func else res
[docs] def inference(self,
data=None,
post_process=None,
unseen_sample_postprocess=None,
**kwargs):
"""
predict the result for input data based on training knowledge.
Parameters
----------
data : BaseDataSource
inference sample, see `sedna.datasources.BaseDataSource` for
more detail.
post_process: function
function or a registered method, effected after `estimator`
prediction, like: label transform.
unseen_sample_postprocess: function
function or a registered method, effected when unseen samples
need to be saved
kwargs: Dict
parameters for `estimator` predict, Like:
`ntree_limit` in Xgboost.XGBClassifier
Returns
-------
"""
res, tasks, is_unseen_task = None, [], False
index_url = self.edge_knowledge_management.task_index
current_version = self.edge_knowledge_management.current_index_version
lastest_version = self.edge_knowledge_management.lastest_index_version
if not FileOps.exists(index_url) or (current_version and
lastest_version and
current_version !=
lastest_version):
task_index_url = Context.get_parameters(
"MODEL_URLS", self.cloud_knowledge_management.task_index)
self.log.info(
f"Download kb index from {task_index_url} to {index_url}")
FileOps.download(task_index_url, index_url)
self.log.info(f"Deploying tasks to the edge.")
self.edge_knowledge_management.update_kb(index_url)
if not self.start_inference_service:
self._start_inference_service()
self.start_inference_service = True
seen_samples, unseen_samples = self.recognize_unseen_samples(
data, **kwargs)
if unseen_samples.x is not None and unseen_samples.num_examples() > 0:
self.edge_knowledge_management.log.info(
f"Unseen task is detected.")
unseen_res, unseen_tasks = \
self.edge_knowledge_management.unseen_estimator.predict(
unseen_samples,
task_index=self.edge_knowledge_management.task_index)
self.edge_knowledge_management.save_unseen_samples(
unseen_samples, post_process=unseen_sample_postprocess)
res = unseen_res
tasks.extend(unseen_tasks)
if data.num_examples() == 1:
is_unseen_task = True
else:
image_names = list(map(lambda x: x[0], unseen_samples.x))
is_unseen_task = dict(
zip(image_names, [True] * unseen_samples.num_examples()))
if seen_samples.x is not None and seen_samples.num_examples() > 0:
seen_res, seen_tasks = \
self.edge_knowledge_management.seen_estimator.predict(
data=seen_samples, post_process=post_process,
task_index=self.edge_knowledge_management.task_index,
task_type="seen_task",
**kwargs
)
res = np.concatenate((res, seen_res)) if res else seen_res
tasks.extend(seen_tasks)
if data.num_examples() > 1:
image_names = list(map(lambda x: x[0], seen_samples.x))
is_unseen_dict = dict(
zip(image_names, [False] * seen_samples.num_examples()))
if isinstance(is_unseen_task, bool):
is_unseen_task = is_unseen_dict
else:
is_unseen_task.update(is_unseen_dict)
return res, is_unseen_task, tasks
def _start_inference_service(self):
if not self.unseen_sample_detection:
self.unseen_sample_detection = UnseenSampleDetection(
self.edge_knowledge_management
)
self.unseen_sample_detection.start()
if not callable(self.recognize_unseen_samples):
self.recognize_unseen_samples = \
ClassFactory.get_cls(
ClassType.UTD,
self.unseen_sample_recognition["method"])(
self.edge_knowledge_management.task_index,
estimator=self.edge_knowledge_management.
seen_estimator.estimator.base_model,
**self.unseen_sample_recognition_param)
if not self.edge_knowledge_management.pinned_service_start:
self.edge_knowledge_management.start_services()
def _task_evaluation(self, data, **kwargs):
res, tasks_detail = \
self.cloud_knowledge_management.seen_estimator.evaluate(
data=data,
**kwargs)
drop_task = self.cloud_knowledge_management.evaluate_tasks(
tasks_detail,
**kwargs)
index_file = self.kb_server.update_task_status(drop_task, new_status=0)
if not index_file:
self.log.error(f"KB update Fail !")
index_file = str(
self.cloud_knowledge_management.local_task_index_url)
else:
self.log.info(f"Deploy {index_file} to the edge.")
return res, index_file