Source code for lib.sedna.common.file_ops

# Copyright 2021 The KubeEdge Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""FileOps class."""

import os
import re

import joblib
import codecs
import pickle
import shutil
import hashlib
import tempfile
from urllib.parse import urlparse

from .utils import singleton


@singleton
def _create_minio_client():
    import minio

    _url = os.getenv("S3_ENDPOINT_URL", "http://s3.amazonaws.com")
    if not (_url.startswith("http://") or _url.startswith("https://")):
        _url = f"https://{_url}"
    url = urlparse(_url)
    use_ssl = url.scheme == 'https' if url.scheme else True

    s3 = minio.Minio(
        url.netloc,
        access_key=os.getenv("ACCESS_KEY_ID", ""),
        secret_key=os.getenv("SECRET_ACCESS_KEY", ""),
        secure=use_ssl
    )
    return s3


[docs]class FileOps: """ This is a class with some class methods to handle some files or folder. """ _GCS_PREFIX = "gs://" _S3_PREFIX = "s3://" _LOCAL_PREFIX = "file://" _URI_RE = "https?://(.+)/(.+)" _HTTP_PREFIX = "http(s)://" _HEADERS_SUFFIX = "-headers" @classmethod
[docs] def make_dir(cls, *args): """Make new a local directory. :param * args: list of str path to joined as a new directory to make. """ _path = cls.join_path(*args) if not os.path.isdir(_path): os.makedirs(_path, exist_ok=True)
@classmethod
[docs] def get_file_hash(cls, filepath): md5_hash = hashlib.md5() if not (filepath and os.path.isfile(filepath)): return "" a_file = open(filepath, "rb") content = a_file.read() md5_hash.update(content) digest = md5_hash.hexdigest() return digest
@classmethod
[docs] def clean_folder(cls, target, clean=True): """clean the target directories. create path if `target` not exists, initial path if `clean` be True :param target: list of str path need to clean. :type target: list :param clean: clear target if exists. :type clean: bool """ if isinstance(target, str): target = [target] for path in set(target): args = str(path).split(os.path.sep) if len(args) < 2: continue if not args[0]: args[0] = os.path.sep _path = cls.join_path(*args) if clean: cls.delete(_path) if os.path.isfile(_path): _path = cls.join_path(*args[:len(args) - 1]) os.makedirs(_path, exist_ok=True) return target
@classmethod
[docs] def delete(cls, path): try: if os.path.isdir(path): shutil.rmtree(path) if os.path.isfile(path): os.remove(path) except Exception: pass
@classmethod
[docs] def make_base_dir(cls, *args): """Make new a base directory. :param * args: list of str path to joined as a new base directory to make. """ _file = cls.join_path(*args) if os.path.isfile(_file): return _path, _ = os.path.split(_file) if not os.path.isdir(_path): os.makedirs(_path, exist_ok=True)
@classmethod
[docs] def join_path(cls, *args): """Join list of path and return. :param * args: list of str path to be joined. :return: joined path str. :rtype: str """ if len(args) == 1: return args[0] is_root = os.path.sep if str(args[0]).startswith(os.path.sep) else "" args = list(map(lambda x: x.lstrip(os.path.sep), args)) args[0] = f"{is_root}{args[0]}" # local path if ":" not in args[0]: args = tuple(args) return os.path.join(*args) # http or s3 path tail = os.path.join(*args[1:]) return os.path.join(args[0], tail)
@classmethod
[docs] def remove_path_prefix(cls, org_str: str, prefix: str): """remove the prefix, for converting path in container to path in host.""" if not prefix: return org_str p = prefix[:-1] if prefix.endswith(os.path.sep) else prefix if org_str.startswith(p): out_str = org_str.replace(p, '', 1) return out_str else: return org_str
@classmethod
[docs] def dump_pickle(cls, obj, filename): """Dump a object to a file using pickle. :param object obj: target object. :param str filename: target pickle file path. """ if not os.path.isfile(filename): cls.make_base_dir(filename) with open(filename, "wb") as f: pickle.dump(obj, f)
@classmethod
[docs] def load_pickle(cls, filename): """Load a pickle file and return the object. :param str filename: target pickle file path. :return: return the loaded original object. :rtype: object or None. """ filename = cls.download(filename) if not os.path.isfile(filename): return None with open(filename, "rb") as f: return pickle.load(f)
@classmethod
[docs] def copy_folder(cls, src, dst): """Copy a folder from source to destination. :param str src: source path. :param str dst: destination path. """ if dst is None or dst == "" or (not os.path.isdir(src)): return if not os.path.exists(dst): shutil.copytree(src, dst) else: if os.path.samefile(src, dst): return for files in os.listdir(src): name = os.path.join(src, files) back_name = os.path.join(dst, files) if os.path.isfile(name): shutil.copy(name, back_name) else: if not os.path.isdir(back_name): shutil.copytree(name, back_name) else: cls.copy_folder(name, back_name)
@classmethod
[docs] def copy_file(cls, src, dst): """Copy a file from source to destination. :param str src: source path. :param str dst: destination path. """ if not dst: return if os.path.isfile(src): if os.path.isfile(dst) and os.path.samefile(src, dst): return if os.path.isdir(dst): basename = os.path.basename(src) dst = os.path.join(dst, basename) parent_dir = os.path.dirname(dst) cls.clean_folder([parent_dir], clean=False) shutil.copy(src, dst) elif os.path.isdir(src): cls.clean_folder([dst], clean=False) cls.copy_folder(src, dst)
@classmethod
[docs] def dump(cls, obj, dst=None) -> str: fd, name = tempfile.mkstemp() os.close(fd) joblib.dump(obj, name) return cls.upload(name, dst)
@classmethod
[docs] def load(cls, src: str): src = cls.download(src) obj = joblib.load(src) return obj
@classmethod
[docs] def is_remote(cls, src): if src.startswith(( cls._GCS_PREFIX, cls._S3_PREFIX )): return True if re.search(cls._URI_RE, src): return True return False
@classmethod
[docs] def download(cls, src, dst=None, unzip=False) -> str: if dst is None: fd, dst = tempfile.mkstemp() os.close(fd) cls.clean_folder([os.path.dirname(dst)], clean=False) if src.startswith(cls._GCS_PREFIX): cls.gcs_download(src, dst) elif src.startswith(cls._S3_PREFIX): cls.s3_download(src, dst) elif cls.is_local(src): cls.copy_file(src, dst) elif re.search(cls._URI_RE, src): cls.http_download(src, dst) if unzip is True and dst.endswith(".tar.gz"): cls._untar(dst) return dst
@classmethod
[docs] def upload(cls, src, dst, tar=False, clean=True) -> str: if dst is None: fd, dst = tempfile.mkstemp() os.close(fd) if not cls.is_local(src): fd, name = tempfile.mkstemp() os.close(fd) cls.download(src, name) src = name if tar: cls._tar(src, f"{src}.tar.gz") src = f"{src}.tar.gz" if dst.startswith(cls._GCS_PREFIX): cls.gcs_upload(src, dst) elif dst.startswith(cls._S3_PREFIX): cls.s3_upload(src, dst) else: cls.copy_file(src, dst) if cls.is_local(src) and clean: if cls.is_local(dst) and os.path.samefile(src, dst): return dst cls.delete(src) return dst
@classmethod
[docs] def is_local(cls, src): return src.startswith(cls._LOCAL_PREFIX) or cls.exists(src)
@classmethod
[docs] def gcs_download(cls, src, dst): """todo: not support now"""
@classmethod
[docs] def gcs_upload(cls, src, dst): """todo: not support now"""
@classmethod def _download_s3(cls, client, uri, out_dir): bucket_args = uri.replace(cls._S3_PREFIX, "", 1).split("/", 1) bucket_name = bucket_args[0] bucket_path = len(bucket_args) > 1 and bucket_args[1] or "" objects = list(client.list_objects(bucket_name, prefix=bucket_path, recursive=True, use_api_v1=True)) count = 0 num = len(objects) for obj in objects: # Replace any prefix from the object key with out_dir subdir_object_key = obj.object_name[len(bucket_path):].strip("/") # fget_object handles directory creation if does not exist if not obj.is_dir: if num == 1 and not os.path.isdir(out_dir): local_file = out_dir else: local_file = os.path.join( out_dir, subdir_object_key or os.path.basename(obj.object_name) ) client.fget_object(bucket_name, obj.object_name, local_file) count += 1 return count @classmethod
[docs] def s3_download(cls, src, dst): s3 = _create_minio_client() count = cls._download_s3(s3, src, dst) if count == 0: raise RuntimeError("Failed to fetch files." "The path %s does not exist." % src)
@classmethod
[docs] def s3_upload(cls, src, dst): s3 = _create_minio_client() parsed = urlparse(dst, scheme='s3') bucket_name = parsed.netloc def _s3_upload(_file, fname=""): _file_handle = open(_file, 'rb') _file_handle.seek(0, os.SEEK_END) size = _file_handle.tell() _file_handle.seek(0) if not fname: fname = os.path.basename(fname) s3.put_object(bucket_name, fname, _file_handle, size) _file_handle.close() return size if os.path.isdir(src): for root, _, files in os.walk(src): for file in files: filepath = os.path.join(root, file) name = os.path.relpath(filepath, src) _s3_upload(filepath, name) elif os.path.isfile(src): _s3_upload(src, parsed.path.lstrip("/"))
@classmethod
[docs] def http_download(cls, src, dst): """Download data from http or https web site. :param src: the data path :type src: str :param dst: the data path :type dst: str :raises FileNotFoundError: if the file path is not exist, an error will raise """ from six.moves import urllib try: urllib.request.urlretrieve(src, dst) except (urllib.error.URLError, IOError) as e: raise e
@classmethod def _untar(cls, src, dst=None): import tarfile if dst is None: dst = os.path.dirname(src) with tarfile.open(src, 'r:gz') as tar: tar.extractall(path=dst) @classmethod def _tar(cls, src, dst): import tarfile with tarfile.open(dst, 'w:gz') as tar: if os.path.isdir(src): for root, _, files in os.walk(src): for file in files: filepath = os.path.join(root, file) tar.add(filepath) elif os.path.isfile(src): tar.add(os.path.realpath(src)) @classmethod
[docs] def exists(cls, folder): """Is folder existed or not. :param folder: folder :type folder: str :return: folder existed or not. :rtype: bool """ return os.path.isdir(folder) or os.path.isfile(folder)
@classmethod
[docs] def obj_to_pickle_string(cls, x): return codecs.encode(pickle.dumps(x), "base64").decode()
@classmethod
[docs] def pickle_string_to_obj(cls, s): return pickle.loads(codecs.decode(s.encode(), "base64"))