This is an automated email from the ASF dual-hosted git repository. machristie pushed a commit to branch mft-integration in repository https://gitbox.apache.org/repos/asf/airavata-django-portal-sdk.git
commit 58ff63c4acff0034a70558005601d83888a52714 Author: Marcus Christie <[email protected]> AuthorDate: Wed Apr 7 18:17:16 2021 -0400 AIRAVATA-3420 Implements DjangoFileSystemProvider, refactored user_storage module to use UserStorageProvider --- airavata_django_portal_sdk/user_storage.py | 1196 -------------------- .../user_storage/__init__.py | 51 + airavata_django_portal_sdk/user_storage/api.py | 758 +++++++++++++ .../user_storage/backends/__init__.py | 3 + .../user_storage/backends/base.py | 64 ++ .../backends/django_filesystem_provider.py | 306 +++++ .../user_storage/backends/mft_provider.py | 152 +++ .../user_storage/backends/remote_api_provider.py | 0 .../user_storage_provider.py | 3 +- 9 files changed, 1336 insertions(+), 1197 deletions(-) diff --git a/airavata_django_portal_sdk/user_storage.py b/airavata_django_portal_sdk/user_storage.py deleted file mode 100644 index 5fb07c5..0000000 --- a/airavata_django_portal_sdk/user_storage.py +++ /dev/null @@ -1,1196 +0,0 @@ -import cgi -import copy -import io -import logging -import mimetypes -import os -import shutil -import warnings -from datetime import datetime -from http import HTTPStatus -from urllib.parse import quote, unquote, urlparse - -import grpc -import requests -from airavata.model.data.replica.ttypes import ( - DataProductModel, - DataProductType, - DataReplicaLocationModel, - ReplicaLocationCategory, - ReplicaPersistentType -) -from django.conf import settings -from django.core.exceptions import ObjectDoesNotExist, SuspiciousFileOperation -from django.core.files import File -from django.core.files.move import file_move_safe -from django.core.files.storage import FileSystemStorage - -from . import MFTApi_pb2, MFTApi_pb2_grpc -from .util import convert_iso8601_to_datetime - -logger = logging.getLogger(__name__) - -TMP_INPUT_FILE_UPLOAD_DIR = "tmp" - - -def save(request, path, file, name=None, content_type=None): - "Save file in path in the user's storage and return DataProduct." - if _is_remote_api(): - if name is None and hasattr(file, 'name'): - name = os.path.basename(file.name) - files = {'file': (name, file, content_type) - if content_type is not None else file, } - resp = _call_remote_api(request, - "/user-storage/~/{path}", - path_params={"path": path}, - method="post", - files=files) - data = resp.json() - product_uri = data['uploaded']['productUri'] - data_product = request.airavata_client.getDataProduct( - request.authz_token, product_uri) - return data_product - else: - username = request.user.username - full_path = _Datastore().save(username, path, file, name=name) - data_product = _save_data_product( - request, full_path, name=name, content_type=content_type - ) - return data_product - - -def move_from_filepath( - request, - source_path, - target_path, - name=None, - content_type=None): - "Move a file from filesystem into user's storage." - # TODO: deprecate this method - username = request.user.username - file_name = name if name is not None else os.path.basename(source_path) - full_path = _Datastore().move_external( - source_path, username, target_path, file_name) - data_product = _save_data_product( - request, full_path, name=file_name, content_type=content_type - ) - return data_product - - -def save_input_file(request, file, name=None, content_type=None): - """Save input file in staging area for input files.""" - if _is_remote_api(): - if name is None and hasattr(file, 'name'): - name = os.path.basename(file.name) - files = {'file': (name, file, content_type) - if content_type is not None else file, } - resp = _call_remote_api(request, - "/upload", - method="post", - files=files) - data = resp.json() - product_uri = data['data-product']['productUri'] - data_product = request.airavata_client.getDataProduct( - request.authz_token, product_uri) - return data_product - else: - username = request.user.username - file_name = name if name is not None else os.path.basename(file.name) - full_path = _Datastore().save(username, TMP_INPUT_FILE_UPLOAD_DIR, file) - data_product = _save_data_product( - request, full_path, name=file_name, content_type=content_type - ) - return data_product - - -def copy_input_file(request, data_product=None, data_product_uri=None): - # TODO: we could probably deprecate this as well, since we do an open/save - # to copy instead. Or at least, we don't need it in UserStorageProvider. - if data_product is None: - data_product = _get_data_product(request, data_product_uri) - path = _get_replica_filepath(data_product) - name = data_product.productName - full_path = _Datastore().copy( - data_product.ownerName, - path, - request.user.username, - TMP_INPUT_FILE_UPLOAD_DIR, - name=name, - ) - return _save_copy_of_data_product(request, full_path, data_product) - - -def is_input_file(request, data_product=None, data_product_uri=None): - # TODO: don't need this in UserStorageProvider - if data_product is None: - data_product = _get_data_product(request, data_product_uri) - if _is_remote_api(): - resp = _call_remote_api( - request, - "/data-products/", - params={'product-uri': data_product.productUri}) - data = resp.json() - return data['isInputFileUpload'] - # Check if file is one of user's files and in TMP_INPUT_FILE_UPLOAD_DIR - path = _get_replica_filepath(data_product) - if _Datastore().exists(request.user.username, path): - rel_path = _Datastore().rel_path(request.user.username, path) - return os.path.dirname(rel_path) == TMP_INPUT_FILE_UPLOAD_DIR - else: - return False - - -def move_input_file(request, data_product=None, path=None, data_product_uri=None): - # TODO: don't need this in UserStorageProvider - if data_product is None: - data_product = _get_data_product(request, data_product_uri) - source_path = _get_replica_filepath(data_product) - file_name = data_product.productName - full_path = _Datastore().move( - data_product.ownerName, - source_path, - request.user.username, - path, - file_name) - _delete_data_product(data_product.ownerName, source_path) - data_product = _save_copy_of_data_product(request, full_path, data_product) - return data_product - - -def move_input_file_from_filepath( - request, source_path, name=None, content_type=None -): - # TODO: don't need this in UserStorageProvider - "Move a file from filesystem into user's input file staging area." - username = request.user.username - file_name = name if name is not None else os.path.basename(source_path) - full_path = _Datastore().move_external( - source_path, username, TMP_INPUT_FILE_UPLOAD_DIR, file_name - ) - data_product = _save_data_product( - request, full_path, name=file_name, content_type=content_type - ) - return data_product - - -def open_file(request, data_product=None, data_product_uri=None): - """ - Return file object for replica if it exists in user storage. One of - `data_product` or `data_product_uri` is required. - """ - if data_product is None: - data_product = _get_data_product(request, data_product_uri) - if _is_remote_api(): - resp = _call_remote_api( - request, - "/download", - params={'data-product-uri': data_product.productUri}) - file = io.BytesIO(resp.content) - disposition = resp.headers['Content-Disposition'] - disp_value, disp_params = cgi.parse_header(disposition) - # Give the file object a name just like a real opened file object - file.name = disp_params['filename'] - return file - else: - path = _get_replica_filepath(data_product) - return _Datastore().open(data_product.ownerName, path) - - -def exists(request, data_product=None, data_product_uri=None): - """ - Return True if replica for data_product exists in user storage. One of - `data_product` or `data_product_uri` is required. - """ - if data_product is None: - data_product = _get_data_product(request, data_product_uri) - if _is_remote_api(): - resp = _call_remote_api( - request, - "/data-products/", - params={'product-uri': data_product.productUri}) - data = resp.json() - return data['downloadURL'] is not None - else: - path = _get_replica_filepath(data_product) - return _Datastore().exists(data_product.ownerName, path) - - -def dir_exists(request, path): - "Return True if path exists in user's data store." - if _is_remote_api(): - resp = _call_remote_api(request, - "/user-storage/~/{path}", - path_params={"path": path}, - raise_for_status=False) - if resp.status_code == HTTPStatus.NOT_FOUND: - return False - resp.raise_for_status() - return resp.json()['isDir'] - else: - user_storage_provider = MFTApiUserStorageProvider() - return user_storage_provider.dir_exists(request, path) - - -def user_file_exists(request, path): - """If file exists, return data product URI, else None.""" - if _is_remote_api(): - resp = _call_remote_api(request, - "/user-storage/~/{path}", - path_params={"path": path}, - raise_for_status=False) - if resp.status_code == HTTPStatus.NOT_FOUND or resp.json()['isDir']: - return None - resp.raise_for_status() - return resp.json()['files'][0]['dataProductURI'] - elif _Datastore().exists(request.user.username, path): - full_path = _Datastore().path(request.user.username, path) - data_product_uri = _get_data_product_uri(request, full_path) - return data_product_uri - else: - return None - - -def delete_dir(request, path): - """Delete path in user's data store, if it exists.""" - if _is_remote_api(): - resp = _call_remote_api(request, - "/user-storage/~/{path}", - path_params={"path": path}, - method="delete", - raise_for_status=False) - _raise_404(resp, f"File path does not exist {path}") - resp.raise_for_status() - return - _Datastore().delete_dir(request.user.username, path) - - -def delete_user_file(request, path): - """Delete file in user's data store, if it exists.""" - if _is_remote_api(): - resp = _call_remote_api(request, - "/user-storage/~/{path}", - path_params={"path": path}, - method="delete", - raise_for_status=False) - _raise_404(resp, f"File path does not exist {path}") - resp.raise_for_status() - return - return _Datastore().delete(request.user.username, path) - - -def update_file_content(request, path, fileContentText): - if _is_remote_api(): - _call_remote_api(request, - "/user-storage/~/{path}", - path_params={"path": path}, - method="put", - data={"fileContentText": fileContentText} - ) - return - else: - full_path = _Datastore().path(request.user.username, path) - with open(full_path, 'w') as f: - myfile = File(f) - myfile.write(fileContentText) - - -def update_data_product_content(request, data_product=None, fileContentText="", data_product_uri=None): - if data_product is None: - data_product = _get_data_product(request, data_product_uri) - # TODO: implement remote api support (DataProductView.put()) - path = _get_replica_filepath(data_product) - full_path = _Datastore().path(request.user.username, path) - with open(full_path, 'w') as f: - myfile = File(f) - myfile.write(fileContentText) - - -def get_file(request, path): - if _is_remote_api(): - resp = _call_remote_api(request, - "/user-storage/~/{path}", - path_params={"path": path}, - raise_for_status=False - ) - _raise_404(resp, "User storage file path does not exist") - data = resp.json() - if data["isDir"]: - raise Exception("User storage path is a directory, not a file") - file = data['files'][0] - file['created_time'] = convert_iso8601_to_datetime(file['createdTime']) - file['mime_type'] = file['mimeType'] - file['data-product-uri'] = file['dataProductURI'] - return file - - user_storage_provider = MFTApiUserStorageProvider() - return user_storage_provider.get_file(request, path) - - -def delete(request, data_product=None, data_product_uri=None): - """ - Delete replica for data product in this data store. One of `data_product` - or `data_product_uri` is required. - """ - if data_product is None: - data_product = _get_data_product(request, data_product_uri) - if _is_remote_api(): - _call_remote_api( - request, - "/delete-file", - params={'data-product-uri': data_product.productUri}, - method="delete") - return - else: - path = _get_replica_filepath(data_product) - try: - _Datastore().delete(data_product.ownerName, path) - _delete_data_product(data_product.ownerName, path) - except Exception: - logger.exception( - "Unable to delete file {} for data product uri {}".format( - path, data_product.productUri - ) - ) - raise - - -def listdir(request, path): - """Return a tuple of two lists, one for directories, the second for files.""" - - if _is_remote_api(): - resp = _call_remote_api(request, - "/user-storage/~/{path}", - path_params={"path": path}, - ) - data = resp.json() - for directory in data['directories']: - # Convert JSON ISO8601 timestamp to datetime instance - directory['created_time'] = convert_iso8601_to_datetime( - directory['createdTime']) - for file in data['files']: - # Convert JSON ISO8601 timestamp to datetime instance - file['created_time'] = convert_iso8601_to_datetime( - file['createdTime']) - file['mime_type'] = file['mimeType'] - file['data-product-uri'] = file['dataProductURI'] - return data['directories'], data['files'] - - user_storage_provider = MFTApiUserStorageProvider() - return user_storage_provider.listdir(request, path) - - -def list_experiment_dir(request, experiment_id, path=""): - """ - List files, directories in experiment data directory. Returns a tuple, - see `listdir`. - """ - if _is_remote_api(): - resp = _call_remote_api(request, - "/experiment-storage/{experiment_id}/{path}", - path_params={"path": path, - "experiment_id": experiment_id}, - ) - data = resp.json() - for directory in data['directories']: - # Convert JSON ISO8601 timestamp to datetime instance - directory['created_time'] = convert_iso8601_to_datetime( - directory['createdTime']) - for file in data['files']: - # Convert JSON ISO8601 timestamp to datetime instance - file['created_time'] = convert_iso8601_to_datetime( - file['createdTime']) - file['mime_type'] = file['mimeType'] - file['data-product-uri'] = file['dataProductURI'] - return data['directories'], data['files'] - - experiment = request.airavata_client.getExperiment( - request.authz_token, experiment_id) - datastore = _Datastore() - exp_data_path = experiment.userConfigurationData.experimentDataDir - exp_data_path = os.path.join(exp_data_path, path) - exp_owner = experiment.userName - if datastore.dir_exists(exp_owner, exp_data_path): - directories, files = datastore.list_user_dir( - exp_owner, exp_data_path) - directories_data = [] - for d in directories: - dpath = os.path.join(exp_data_path, d) - rel_path = os.path.join(path, d) - created_time = datastore.get_created_time( - exp_owner, dpath) - size = datastore.size(exp_owner, dpath) - directories_data.append( - { - "name": d, - "path": rel_path, - "created_time": created_time, - "size": size, - } - ) - files_data = [] - for f in files: - user_rel_path = os.path.join(exp_data_path, f) - if not datastore.exists(exp_owner, user_rel_path): - logger.warning( - f"list_experiment_dir skipping {exp_owner}:{user_rel_path}, " - "does not exist (broken symlink?)") - continue - created_time = datastore.get_created_time( - exp_owner, user_rel_path - ) - size = datastore.size(exp_owner, user_rel_path) - full_path = datastore.path(exp_owner, user_rel_path) - data_product_uri = _get_data_product_uri(request, full_path, owner=exp_owner) - - data_product = request.airavata_client.getDataProduct( - request.authz_token, data_product_uri) - mime_type = None - if 'mime-type' in data_product.productMetadata: - mime_type = data_product.productMetadata['mime-type'] - files_data.append( - { - "name": f, - "path": user_rel_path, - "data-product-uri": data_product_uri, - "created_time": created_time, - "mime_type": mime_type, - "size": size, - "hidden": False, - } - ) - return directories_data, files_data - else: - raise ObjectDoesNotExist("Experiment data directory does not exist") - - -def experiment_dir_exists(request, experiment_id, path=""): - - if _is_remote_api(): - resp = _call_remote_api(request, - "/experiment-storage/{experiment_id}/{path}", - path_params={"path": path, - "experiment_id": experiment_id}, - raise_for_status=False) - if resp.status_code == HTTPStatus.NOT_FOUND: - return False - resp.raise_for_status() - return resp.json()['isDir'] - - experiment = request.airavata_client.getExperiment( - request.authz_token, experiment_id) - datastore = _Datastore() - exp_data_path = experiment.userConfigurationData.experimentDataDir - if exp_data_path is None: - return False - exp_data_path = os.path.join(exp_data_path, path) - exp_owner = experiment.userName - return datastore.dir_exists(exp_owner, exp_data_path) - - -def get_experiment_dir( - request, - project_name=None, - experiment_name=None, - path=None): - return _Datastore().get_experiment_dir( - request.user.username, project_name, experiment_name, path - ) - - -def create_user_dir(request, path): - if _is_remote_api(): - logger.debug(f"path={path}") - _call_remote_api(request, - "/user-storage/~/{path}", - path_params={"path": path}, - method="post") - return - _Datastore().create_user_dir(request.user.username, path) - - -def get_rel_path(request, path): - return _Datastore().rel_path(request.user.username, path) - - -def get_rel_experiment_dir(request, experiment_id): - """Return experiment data dir path relative to user's directory.""" - warnings.warn("Use list_experiment_dir instead.", DeprecationWarning) - if _is_remote_api(): - resp = _call_remote_api(request, - "/experiments/{experimentId}/", - path_params={"experimentId": experiment_id}) - resp.raise_for_status() - return resp.json()['relativeExperimentDataDir'] - - experiment = request.airavata_client.getExperiment( - request.authz_token, experiment_id) - if (experiment.userConfigurationData and - experiment.userConfigurationData.experimentDataDir): - datastore = _Datastore() - data_dir = experiment.userConfigurationData.experimentDataDir - if datastore.dir_exists(request.user.username, data_dir): - return datastore.rel_path(request.user.username, data_dir) - else: - return None - else: - return None - - -def _get_data_product_uri(request, full_path, owner=None): - - from airavata_django_portal_sdk import models - if owner is None: - owner = request.user.username - user_file = models.UserFiles.objects.filter( - username=owner, file_path=full_path) - if user_file.exists(): - product_uri = user_file[0].file_dpu - else: - data_product = _save_data_product(request, full_path, owner=owner) - product_uri = data_product.productUri - return product_uri - - -def _get_data_product(request, data_product_uri): - return request.airavata_client.getDataProduct( - request.authz_token, data_product_uri) - - -def _save_data_product(request, full_path, name=None, content_type=None, owner=None): - "Create, register and record in DB a data product for full_path." - if owner is None: - owner = request.user.username - data_product = _create_data_product( - owner, full_path, name=name, content_type=content_type - ) - product_uri = _register_data_product(request, full_path, data_product, owner=owner) - data_product.productUri = product_uri - return data_product - - -def _register_data_product(request, full_path, data_product, owner=None): - if owner is None: - owner = request.user.username - product_uri = request.airavata_client.registerDataProduct( - request.authz_token, data_product - ) - from airavata_django_portal_sdk import models - user_file_instance = models.UserFiles( - username=owner, - file_path=full_path, - file_dpu=product_uri) - user_file_instance.save() - return product_uri - - -def _save_copy_of_data_product(request, full_path, data_product): - """Save copy of a data product with a different path.""" - data_product_copy = _copy_data_product(request, data_product, full_path) - product_uri = _register_data_product(request, full_path, data_product_copy) - data_product_copy.productUri = product_uri - return data_product_copy - - -def _copy_data_product(request, data_product, full_path): - """Create an unsaved copy of a data product with different path.""" - data_product_copy = copy.copy(data_product) - data_product_copy.productUri = None - data_product_copy.ownerName = request.user.username - data_replica_location = _create_replica_location( - full_path, data_product_copy.productName - ) - data_product_copy.replicaLocations = [data_replica_location] - return data_product_copy - - -def _delete_data_product(username, full_path): - # TODO: call API to delete data product from replica catalog when it is - # available (not currently implemented) - from airavata_django_portal_sdk import models - user_file = models.UserFiles.objects.filter( - username=username, file_path=full_path) - if user_file.exists(): - user_file.delete() - - -def _create_data_product(username, full_path, name=None, content_type=None): - data_product = DataProductModel() - data_product.gatewayId = settings.GATEWAY_ID - data_product.ownerName = username - if name is not None: - file_name = name - else: - file_name = os.path.basename(full_path) - data_product.productName = file_name - data_product.dataProductType = DataProductType.FILE - final_content_type = _determine_content_type(full_path, content_type) - if final_content_type is not None: - data_product.productMetadata = {"mime-type": final_content_type} - data_replica_location = _create_replica_location(full_path, file_name) - data_product.replicaLocations = [data_replica_location] - return data_product - - -def _determine_content_type(full_path, content_type=None): - result = content_type - if result is None: - # Try to guess the content-type from file extension - guessed_type, encoding = mimetypes.guess_type(full_path) - result = guessed_type - if result is None or result == "application/octet-stream": - # Check if file is Unicode text by trying to read some of it - try: - open(full_path, "r").read(1024) - result = "text/plain" - except UnicodeDecodeError: - logger.debug(f"Failed to read as Unicode text: {full_path}") - return result - - -def _create_replica_location(full_path, file_name): - data_replica_location = DataReplicaLocationModel() - data_replica_location.storageResourceId = settings.GATEWAY_DATA_STORE_RESOURCE_ID - data_replica_location.replicaName = "{} gateway data store copy".format( - file_name) - data_replica_location.replicaLocationCategory = ( - ReplicaLocationCategory.GATEWAY_DATA_STORE - ) - data_replica_location.replicaPersistentType = ReplicaPersistentType.TRANSIENT - data_replica_location.filePath = "file://{}:{}".format( - settings.GATEWAY_DATA_STORE_HOSTNAME, quote(full_path) - ) - return data_replica_location - - -def _get_replica_filepath(data_product): - replica_filepaths = [ - rep.filePath - for rep in data_product.replicaLocations - if rep.replicaLocationCategory == ReplicaLocationCategory.GATEWAY_DATA_STORE - ] - replica_filepath = replica_filepaths[0] if len( - replica_filepaths) > 0 else None - if replica_filepath: - return unquote(urlparse(replica_filepath).path) - return None - - -def _is_remote_api(): - return getattr(settings, 'GATEWAY_DATA_STORE_REMOTE_API', None) is not None - - -def _call_remote_api( - request, - path, - path_params=None, - method="get", - raise_for_status=True, - **kwargs): - - headers = { - 'Authorization': f'Bearer {request.authz_token.accessToken}'} - encoded_path_params = {} - if path_params is not None: - for pk, pv in path_params.items(): - encoded_path_params[pk] = quote(pv) - encoded_path = path.format(**encoded_path_params) - logger.debug(f"encoded_path={encoded_path}") - r = requests.request( - method, - f'{settings.GATEWAY_DATA_STORE_REMOTE_API}{encoded_path}', - headers=headers, - **kwargs, - ) - if raise_for_status: - r.raise_for_status() - return r - - -def _raise_404(response, msg, exception_class=ObjectDoesNotExist): - if response.status_code == 404: - raise exception_class(msg) - - -class _Datastore: - """Internal datastore abstraction.""" - - def __init__(self, directory=None): - if getattr( - settings, - 'GATEWAY_DATA_STORE_REMOTE_API', - None) is not None: - raise Exception( - f"This Django portal instance is configured to connect to a " - f"remote data store via API (settings.GATEWAY_DATA_STORE_REMOTE_API=" - f"{settings.GATEWAY_DATA_STORE_REMOTE_API}). This local " - f"Datastore instance is not available in remote data store mode.") - if directory: - self.directory = directory - else: - self.directory = settings.GATEWAY_DATA_STORE_DIR - - def exists(self, username, path): - """Check if file path exists in this data store.""" - try: - return self._user_data_storage(username).exists( - path) and os.path.isfile(self.path(username, path)) - except SuspiciousFileOperation as e: - logger.warning( - "Invalid path for user {}: {}".format( - username, str(e))) - return False - - def dir_exists(self, username, path): - """Check if directory path exists in this data store.""" - try: - return self._user_data_storage(username).exists( - path) and os.path.isdir(self.path(username, path)) - except SuspiciousFileOperation as e: - logger.warning( - "Invalid path for user {}: {}".format( - username, str(e))) - return False - - def open(self, username, path): - """Open path for user if it exists in this data store.""" - if self.exists(username, path): - return self._user_data_storage(username).open(path) - else: - raise ObjectDoesNotExist( - "File path does not exist: {}".format(path)) - - def save(self, username, path, file, name=None): - """Save file to username/path in data store.""" - # file.name may be full path, so get just the name of the file - file_name = name if name is not None else os.path.basename(file.name) - user_data_storage = self._user_data_storage(username) - file_path = os.path.join( - path, user_data_storage.get_valid_name(file_name)) - input_file_name = user_data_storage.save(file_path, file) - input_file_fullpath = user_data_storage.path(input_file_name) - return input_file_fullpath - - def move( - self, - source_username, - source_path, - target_username, - target_dir, - file_name): - source_full_path = self.path(source_username, source_path) - user_data_storage = self._user_data_storage(target_username) - # Make file_name a valid filename - target_path = os.path.join( - target_dir, user_data_storage.get_valid_name(file_name) - ) - # Get available file path: if there is an existing file at target_path - # create a uniquely named path - target_path = user_data_storage.get_available_name(target_path) - target_full_path = self.path(target_username, target_path) - file_move_safe(source_full_path, target_full_path) - return target_full_path - - def move_external( - self, - external_path, - target_username, - target_dir, - file_name): - user_data_storage = self._user_data_storage(target_username) - # Make file_name a valid filename - target_path = os.path.join( - target_dir, user_data_storage.get_valid_name(file_name) - ) - # Get available file path: if there is an existing file at target_path - # create a uniquely named path - target_path = user_data_storage.get_available_name(target_path) - if not self.dir_exists(target_username, target_dir): - self.create_user_dir(target_username, target_dir) - target_full_path = self.path(target_username, target_path) - file_move_safe(external_path, target_full_path) - return target_full_path - - def create_user_dir(self, username, path): - user_data_storage = self._user_data_storage(username) - if not user_data_storage.exists(path): - self._makedirs(username, path) - else: - raise Exception("Directory {} already exists".format(path)) - - def copy( - self, - source_username, - source_path, - target_username, - target_path, - name=None): - """Copy a user file into target_path dir.""" - f = self.open(source_username, source_path) - return self.save(target_username, target_path, f, name=name) - - def delete(self, username, path): - """Delete file in this data store.""" - if self.exists(username, path): - user_data_storage = self._user_data_storage(username) - user_data_storage.delete(path) - else: - raise ObjectDoesNotExist( - "File path does not exist: {}".format(path)) - - def delete_dir(self, username, path): - """Delete entire directory in this data store.""" - if self.dir_exists(username, path): - user_path = self.path(username, path) - shutil.rmtree(user_path) - else: - raise ObjectDoesNotExist( - "File path does not exist: {}".format(path)) - - def get_experiment_dir( - self, username, project_name=None, experiment_name=None, path=None - ): - """Return an experiment directory (full path) for the given experiment.""" - user_experiment_data_storage = self._user_data_storage(username) - if path is None: - proj_dir_name = user_experiment_data_storage.get_valid_name( - project_name) - # AIRAVATA-3245 Make project directory with correct permissions - if not user_experiment_data_storage.exists(proj_dir_name): - self._makedirs(username, proj_dir_name) - experiment_dir_name = os.path.join( - proj_dir_name, - user_experiment_data_storage.get_valid_name(experiment_name), - ) - # Since there may already be another experiment with the same name in - # this project, we need to check for available name - experiment_dir_name = user_experiment_data_storage.get_available_name( - experiment_dir_name) - experiment_dir = user_experiment_data_storage.path( - experiment_dir_name) - else: - # path can be relative to the user's storage space or absolute (as long - # as it is still inside the user's storage space) - # if path is passed in, assumption is that it has already been - # created - user_experiment_data_storage = self._user_data_storage(username) - experiment_dir = user_experiment_data_storage.path(path) - if not user_experiment_data_storage.exists(experiment_dir): - self._makedirs(username, experiment_dir) - return experiment_dir - - def _makedirs(self, username, dir_path): - user_experiment_data_storage = self._user_data_storage(username) - full_path = user_experiment_data_storage.path(dir_path) - os.makedirs( - full_path, - mode=user_experiment_data_storage.directory_permissions_mode) - # os.makedirs mode isn't always respected so need to chmod to be sure - os.chmod( - full_path, - mode=user_experiment_data_storage.directory_permissions_mode) - - def list_user_dir(self, username, file_path): - logger.debug("file_path={}".format(file_path)) - user_data_storage = self._user_data_storage(username) - return user_data_storage.listdir(file_path) - - def get_created_time(self, username, file_path): - user_data_storage = self._user_data_storage(username) - return user_data_storage.get_created_time(file_path) - - def size(self, username, file_path): - user_data_storage = self._user_data_storage(username) - full_path = self.path(username, file_path) - if os.path.isdir(full_path): - return self._get_dir_size(full_path) - else: - return user_data_storage.size(file_path) - - def path(self, username, file_path): - user_data_storage = self._user_data_storage(username) - return user_data_storage.path(file_path) - - def rel_path(self, username, file_path): - full_path = self.path(username, file_path) - return os.path.relpath(full_path, self.path(username, "")) - - def _user_data_storage(self, username): - return FileSystemStorage( - location=os.path.join( - self.directory, username)) - - # from https://stackoverflow.com/a/1392549 - def _get_dir_size(self, start_path="."): - total_size = 0 - for dirpath, dirnames, filenames in os.walk(start_path): - for f in filenames: - fp = os.path.join(dirpath, f) - # Check for broken symlinks (.exists return False for broken - # symlinks) - if os.path.exists(fp): - total_size += os.path.getsize(fp) - return total_size - - -class UserStorageProvider: - def dir_exists(self, request, path): - raise NotImplementedError() - - def listdir(self, request, path): - raise NotImplementedError() - - def get_file(self, request, path): - raise NotImplementedError() - - -class FileSystemUserStorageProvider(UserStorageProvider): - def dir_exists(self, request, path): - return _Datastore().dir_exists(request.user.username, path) - - def listdir(self, request, path): - datastore = _Datastore() - if datastore.dir_exists(request.user.username, path): - directories, files = datastore.list_user_dir( - request.user.username, path) - directories_data = [] - for d in directories: - dpath = os.path.join(path, d) - created_time = datastore.get_created_time( - request.user.username, dpath) - size = datastore.size(request.user.username, dpath) - directories_data.append( - { - "name": d, - "path": dpath, - "created_time": created_time, - "size": size, - "hidden": dpath == TMP_INPUT_FILE_UPLOAD_DIR, - } - ) - files_data = [] - for f in files: - user_rel_path = os.path.join(path, f) - if not datastore.exists(request.user.username, user_rel_path): - logger.warning(f"listdir skipping {request.user.username}:{user_rel_path}, " - "does not exist (broken symlink?)") - continue - created_time = datastore.get_created_time( - request.user.username, user_rel_path - ) - size = datastore.size(request.user.username, user_rel_path) - full_path = datastore.path(request.user.username, user_rel_path) - data_product_uri = _get_data_product_uri(request, full_path) - - data_product = request.airavata_client.getDataProduct( - request.authz_token, data_product_uri) - mime_type = None - if 'mime-type' in data_product.productMetadata: - mime_type = data_product.productMetadata['mime-type'] - files_data.append( - { - "name": f, - "path": user_rel_path, - "data-product-uri": data_product_uri, - "created_time": created_time, - "mime_type": mime_type, - "size": size, - "hidden": False, - } - ) - return directories_data, files_data - else: - raise ObjectDoesNotExist("User storage path does not exist") - - def get_file(self, request, path): - - if _is_remote_api(): - resp = _call_remote_api(request, - "/user-storage/~/{path}", - path_params={"path": path}, - raise_for_status=False - ) - _raise_404(resp, "User storage file path does not exist") - data = resp.json() - if data["isDir"]: - raise Exception("User storage path is a directory, not a file") - file = data['files'][0] - file['created_time'] = convert_iso8601_to_datetime(file['createdTime']) - file['mime_type'] = file['mimeType'] - file['data-product-uri'] = file['dataProductURI'] - return file - datastore = _Datastore() - if datastore.exists(request.user.username, path): - created_time = datastore.get_created_time( - request.user.username, path) - size = datastore.size(request.user.username, path) - full_path = datastore.path(request.user.username, path) - data_product_uri = _get_data_product_uri(request, full_path) - dir_path, file_name = os.path.split(path) - - data_product = request.airavata_client.getDataProduct( - request.authz_token, data_product_uri) - mime_type = None - if 'mime-type' in data_product.productMetadata: - mime_type = data_product.productMetadata['mime-type'] - - return { - 'name': full_path, - 'path': dir_path, - 'data-product-uri': data_product_uri, - 'created_time': created_time, - 'mime_type': mime_type, - 'size': size, - 'hidden': False - } - else: - raise ObjectDoesNotExist("User storage file path does not exist") - - -class MFTApiUserStorageProvider(UserStorageProvider): - def __init__(self) -> None: - super().__init__() - - def dir_exists(self, request, path): - with grpc.insecure_channel('localhost:7004') as channel: - # remove trailing slash and figure out parent path - # FIXME remove the hard coded /tmp path - parent_path, child_path = os.path.split(f"/tmp/{path}".rstrip("/")) - logger.debug(f"parent_path={parent_path}, child_path={child_path}") - stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel) - # Get metadata for parent directory and see if child_path exists - request = MFTApi_pb2.FetchResourceMetadataRequest( - resourceId="remote-ssh-dir-resource", - resourceType="SCP", - resourceToken="local-ssh-cred", - resourceBackend="FILE", - resourceCredentialBackend="FILE", - targetAgentId="agent0", - childPath=parent_path, - mftAuthorizationToken="user token") - response = stub.getDirectoryResourceMetadata(request) - # if not child_path, then return True since the response was - # successful and we just need to confirm the existence of the root dir - if child_path == '': - return True - return child_path in map(lambda f: f.friendlyName, response.directories) - - def listdir(self, request, path): - # TODO setup resourceId, etc from __init__ arguments - channel = grpc.insecure_channel('localhost:7004') - stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel) - request = MFTApi_pb2.FetchResourceMetadataRequest( - resourceId="remote-ssh-dir-resource", - resourceType="SCP", - resourceToken="local-ssh-cred", - resourceBackend="FILE", - resourceCredentialBackend="FILE", - targetAgentId="agent0", - childPath=f"/tmp/{path}", - mftAuthorizationToken="user token") - response = stub.getDirectoryResourceMetadata(request) - directories_data = [] - for d in response.directories: - - dpath = os.path.join(path, d.friendlyName) - created_time = datetime.fromtimestamp(d.createdTime) - # TODO MFT API doesn't report size - size = 0 - directories_data.append( - { - "name": d.friendlyName, - "path": dpath, - "created_time": created_time, - "size": size, - # TODO how to handle hidden directories or directories for - # staging input file uploads - "hidden": False - } - ) - files_data = [] - for f in response.files: - user_rel_path = os.path.join(path, f.friendlyName) - # TODO do we need to check for broken symlinks? - created_time = datetime.fromtimestamp(f.createdTime) - # TODO get the size as well - size = 0 - # full_path = datastore.path(request.user.username, user_rel_path) - # TODO how do we register these as data products, do we need to? - # data_product_uri = _get_data_product_uri(request, full_path) - - # data_product = request.airavata_client.getDataProduct( - # request.authz_token, data_product_uri) - # mime_type = None - # if 'mime-type' in data_product.productMetadata: - # mime_type = data_product.productMetadata['mime-type'] - files_data.append( - { - "name": f.friendlyName, - "path": user_rel_path, - "data-product-uri": None, - "created_time": created_time, - "mime_type": None, - "size": size, - "hidden": False, - } - ) - return directories_data, files_data - - def get_file(self, request, path): - # FIXME remove hard coded /tmp path - path = f"/tmp/{path}".rstrip("/") - file_metadata = self._get_file(path) - if file_metadata is not None: - user_rel_path = os.path.join(path, file_metadata.friendlyName) - created_time = datetime.fromtimestamp(file_metadata.createdTime) - # TODO get the size as well - size = 0 - - return { - "name": file_metadata.friendlyName, - "path": user_rel_path, - "data-product-uri": None, - "created_time": created_time, - "mime_type": None, - "size": size, - "hidden": False, - } - else: - raise ObjectDoesNotExist("User storage file path does not exist") - - def _get_file(self, path): - with grpc.insecure_channel('localhost:7004') as channel: - stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel) - # Get metadata for parent directory and see if child_path exists - request = MFTApi_pb2.FetchResourceMetadataRequest( - resourceId="remote-ssh-dir-resource", - resourceType="SCP", - resourceToken="local-ssh-cred", - resourceBackend="FILE", - resourceCredentialBackend="FILE", - targetAgentId="agent0", - childPath=path, - mftAuthorizationToken="user token") - try: - # TODO is there a better way to check if file exists than catching exception? - return stub.getFileResourceMetadata(request) - except Exception: - logger.exception(f"_get_file({path})") - return None - - def _get_download_url(self, path): - - with grpc.insecure_channel('localhost:7004') as channel: - stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel) - download_request = MFTApi_pb2.HttpDownloadApiRequest(sourceStoreId="remote-ssh-storage", - sourcePath="/tmp/a.txt", - sourceToken="local-ssh-cred", - sourceType="SCP", - targetAgent="agent0", - mftAuthorizationToken="") - try: - # TODO is there a better way to check if file exists than catching exception? - # response stub.submitHttpDownload(request) - pass - except Exception: - logger.exception(f"_get_file({path})") - return None diff --git a/airavata_django_portal_sdk/user_storage/__init__.py b/airavata_django_portal_sdk/user_storage/__init__.py new file mode 100644 index 0000000..21a22b4 --- /dev/null +++ b/airavata_django_portal_sdk/user_storage/__init__.py @@ -0,0 +1,51 @@ +from .api import ( + copy_input_file, + create_user_dir, + delete, + delete_dir, + delete_user_file, + dir_exists, + exists, + experiment_dir_exists, + get_experiment_dir, + get_file, + get_file_metadata, + get_rel_experiment_dir, + is_input_file, + list_experiment_dir, + listdir, + move, + move_input_file, + open_file, + save, + save_input_file, + update_data_product_content, + update_file_content, + user_file_exists +) + +__all__ = [ + 'copy_input_file', + 'create_user_dir', + 'delete', + 'delete_dir', + 'delete_user_file', + 'dir_exists', + 'exists', + 'experiment_dir_exists', + 'get_experiment_dir', + 'get_file', + 'get_file_metadata', + 'get_rel_experiment_dir', + 'is_input_file', + 'list_experiment_dir', + 'listdir', + 'move', + 'move_input_file', + 'open_file', + 'save', + 'save_input_file', + 'update_data_product_content', + 'update_file_content', + 'user_file_exists' +] diff --git a/airavata_django_portal_sdk/user_storage/api.py b/airavata_django_portal_sdk/user_storage/api.py new file mode 100644 index 0000000..4a21498 --- /dev/null +++ b/airavata_django_portal_sdk/user_storage/api.py @@ -0,0 +1,758 @@ +import cgi +import copy +import importlib +import io +import logging +import mimetypes +import os +import warnings +from http import HTTPStatus +from urllib.parse import quote, unquote, urlparse + +import requests +from airavata.model.data.replica.ttypes import ( + DataProductModel, + DataProductType, + DataReplicaLocationModel, + ReplicaLocationCategory, + ReplicaPersistentType +) +from django.conf import settings +from django.core.exceptions import ObjectDoesNotExist + +from ..util import convert_iso8601_to_datetime + +logger = logging.getLogger(__name__) + +TMP_INPUT_FILE_UPLOAD_DIR = "tmp" + + +def get_user_storage_provider(request, owner_username=None, storage_resource_id=None): + # TODO: default the module_class_name to MFT provider + module_class_name = None + options = {} + if storage_resource_id is None: + if not hasattr(settings, 'USER_STORAGES'): + # make this backward compatible with the older settings + module_class_name = 'airavata_django_portal_sdk.user_storage.backends.DjangoFileSystemProvider' + storage_resource_id = settings.GATEWAY_DATA_STORE_RESOURCE_ID + options = dict(directory=settings.GATEWAY_DATA_STORE_DIR) + logger.warning("Please add the USER_STORAGES setting. Using legacy GATEWAY_DATA_STORE_RESOURCE_ID and GATEWAY_DATA_STORE_DIR settings.") + else: + conf = settings.USER_STORAGES["default"] + module_class_name = conf['BACKEND'] + storage_resource_id = conf['STORAGE_RESOURCE_ID'] + options = conf.get('OPTIONS', {}) + else: + if not hasattr(settings, 'USER_STORAGES'): + # make this backward compatible with the older settings + module_class_name = 'airavata_django_portal_sdk.user_storage.backends.DjangoFileSystemProvider' + storage_resource_id = settings.GATEWAY_DATA_STORE_RESOURCE_ID + options = dict(directory=settings.GATEWAY_DATA_STORE_DIR) + logger.warning("Please add the USER_STORAGES setting. Using legacy GATEWAY_DATA_STORE_RESOURCE_ID and GATEWAY_DATA_STORE_DIR settings.") + else: + for conf in settings.USER_STORAGES: + if conf['STORAGE_RESOURCE_ID'] == storage_resource_id: + module_class_name = conf['BACKEND'] + options = conf.get('OPTIONS', {}) + break + module_name, class_name = module_class_name.rsplit(".", 1) + module = importlib.import_module(module_name) + BackendClass = getattr(module, class_name) + authz_token = request.authz_token + context = { + 'request': request, + 'owner_username': owner_username, + } + instance = BackendClass(authz_token, storage_resource_id, context=context, **options) + return instance + + +def save(request, path, file, name=None, content_type=None, storage_resource_id=None): + "Save file in path in the user's storage and return DataProduct." + if _is_remote_api(): + if name is None and hasattr(file, 'name'): + name = os.path.basename(file.name) + files = {'file': (name, file, content_type) + if content_type is not None else file, } + resp = _call_remote_api(request, + "/user-storage/~/{path}", + path_params={"path": path}, + method="post", + files=files) + data = resp.json() + product_uri = data['uploaded']['productUri'] + data_product = request.airavata_client.getDataProduct( + request.authz_token, product_uri) + return data_product + backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id) + storage_resource_id, resource_path = backend.save(path, file, name=name, content_type=content_type) + data_product = _save_data_product( + request, resource_path, storage_resource_id, name=name, content_type=content_type + ) + return data_product + + +def save_input_file(request, file, name=None, content_type=None, storage_resource_id=None): + """Save input file in staging area for input files.""" + if _is_remote_api(): + if name is None and hasattr(file, 'name'): + name = os.path.basename(file.name) + files = {'file': (name, file, content_type) + if content_type is not None else file, } + resp = _call_remote_api(request, + "/upload", + method="post", + files=files) + data = resp.json() + product_uri = data['data-product']['productUri'] + data_product = request.airavata_client.getDataProduct( + request.authz_token, product_uri) + return data_product + else: + backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id) + file_name = name if name is not None else os.path.basename(file.name) + storage_resource_id, resource_path = backend.save( + TMP_INPUT_FILE_UPLOAD_DIR, file, name=file_name) + data_product = _save_data_product( + request, resource_path, storage_resource_id, name=name, content_type=content_type + ) + return data_product + + +def copy_input_file(request, data_product=None, data_product_uri=None, storage_resource_id=None): + if data_product is None: + data_product = _get_data_product(request, data_product_uri) + source_storage_resource_id, source_resource_path = _get_replica_resource_id_and_filepath(data_product) + source_backend = get_user_storage_provider(request, + owner_username=data_product.ownerName, + storage_resource_id=source_storage_resource_id) + file = source_backend.open(source_resource_path) + name = data_product.productName + target_backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id) + storage_resource_id, full_path = target_backend.save(TMP_INPUT_FILE_UPLOAD_DIR, file, name=name) + return _save_copy_of_data_product(request, full_path, data_product, storage_resource_id) + + +def is_input_file(request, data_product=None, data_product_uri=None): + if data_product is None: + data_product = _get_data_product(request, data_product_uri) + if _is_remote_api(): + resp = _call_remote_api( + request, + "/data-products/", + params={'product-uri': data_product.productUri}) + data = resp.json() + return data['isInputFileUpload'] + # Check if file is one of user's files and in TMP_INPUT_FILE_UPLOAD_DIR + storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product) + backend = get_user_storage_provider(request, + owner_username=data_product.ownerName, + storage_resource_id=storage_resource_id) + if backend.exists(path): + directories, files = backend.get_metadata(path) + rel_path = files[0]['path'] + return os.path.dirname(rel_path) == TMP_INPUT_FILE_UPLOAD_DIR + else: + return False + + +def move(request, data_product=None, path=None, data_product_uri=None, storage_resource_id=None): + if data_product is None: + data_product = _get_data_product(request, data_product_uri) + source_storage_resource_id, source_path = _get_replica_resource_id_and_filepath(data_product) + source_backend = get_user_storage_provider(request, + owner_username=data_product.ownerName, + storage_resource_id=source_storage_resource_id) + file = source_backend.open(source_path) + file_name = data_product.productName + target_backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id) + storage_resource_id, full_path = target_backend.save(path, file, name=file_name) + data_product_copy = _save_copy_of_data_product(request, full_path, data_product, storage_resource_id) + # Remove the source file and data product metadata + source_backend.delete(source_path) + _delete_data_product(data_product.ownerName, source_path) + return data_product_copy + + +def move_input_file(request, data_product=None, path=None, data_product_uri=None, storage_resource_id=None): + warnings.warn("Use 'move' instead.", DeprecationWarning) + return move(request, data_product=data_product, path=path, data_product_uri=data_product_uri, storage_resource_id=storage_resource_id) + + +def open_file(request, data_product=None, data_product_uri=None): + """ + Return file object for replica if it exists in user storage. One of + `data_product` or `data_product_uri` is required. + """ + if data_product is None: + data_product = _get_data_product(request, data_product_uri) + if _is_remote_api(): + resp = _call_remote_api( + request, + "/download", + params={'data-product-uri': data_product.productUri}) + file = io.BytesIO(resp.content) + disposition = resp.headers['Content-Disposition'] + disp_value, disp_params = cgi.parse_header(disposition) + # Give the file object a name just like a real opened file object + file.name = disp_params['filename'] + return file + else: + storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product) + backend = get_user_storage_provider(request, + owner_username=data_product.ownerName, + storage_resource_id=storage_resource_id) + return backend.open(path) + + +def exists(request, data_product=None, data_product_uri=None): + """ + Return True if replica for data_product exists in user storage. One of + `data_product` or `data_product_uri` is required. + """ + if data_product is None: + data_product = _get_data_product(request, data_product_uri) + if _is_remote_api(): + resp = _call_remote_api( + request, + "/data-products/", + params={'product-uri': data_product.productUri}) + data = resp.json() + return data['downloadURL'] is not None + else: + storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product) + backend = get_user_storage_provider(request, + owner_username=data_product.ownerName, + storage_resource_id=storage_resource_id) + return backend.exists(path) + + +def dir_exists(request, path, storage_resource_id=None): + "Return True if path exists in user's data store." + if _is_remote_api(): + resp = _call_remote_api(request, + "/user-storage/~/{path}", + path_params={"path": path}, + raise_for_status=False) + if resp.status_code == HTTPStatus.NOT_FOUND: + return False + resp.raise_for_status() + return resp.json()['isDir'] + else: + backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id) + return backend.exists(path) + + +def user_file_exists(request, path, storage_resource_id=None): + """If file exists, return data product URI, else None.""" + if _is_remote_api(): + resp = _call_remote_api(request, + "/user-storage/~/{path}", + path_params={"path": path}, + raise_for_status=False) + if resp.status_code == HTTPStatus.NOT_FOUND or resp.json()['isDir']: + return None + resp.raise_for_status() + return resp.json()['files'][0]['dataProductURI'] + backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id) + if backend.exists(path) and backend.is_file(path): + _, files = backend.get_metadata(path) + full_path = files[0]['resource_path'] + data_product_uri = _get_data_product_uri(request, full_path, backend.resource_id) + return data_product_uri + else: + return None + + +def delete_dir(request, path, storage_resource_id=None): + """Delete path in user's data store, if it exists.""" + if _is_remote_api(): + resp = _call_remote_api(request, + "/user-storage/~/{path}", + path_params={"path": path}, + method="delete", + raise_for_status=False) + _raise_404(resp, f"File path does not exist {path}") + resp.raise_for_status() + return + backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id) + backend.delete(path) + + +def delete_user_file(request, path, storage_resource_id=None): + """Delete file in user's data store, if it exists.""" + if _is_remote_api(): + resp = _call_remote_api(request, + "/user-storage/~/{path}", + path_params={"path": path}, + method="delete", + raise_for_status=False) + _raise_404(resp, f"File path does not exist {path}") + resp.raise_for_status() + return + backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id) + backend.delete(path) + + +def update_file_content(request, path, fileContentText, storage_resource_id=None): + if _is_remote_api(): + _call_remote_api(request, + "/user-storage/~/{path}", + path_params={"path": path}, + method="put", + data={"fileContentText": fileContentText} + ) + return + else: + backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id) + file = io.StringIO(fileContentText) + backend.update(path, file) + + +def update_data_product_content(request, data_product=None, fileContentText="", data_product_uri=None): + if data_product is None: + data_product = _get_data_product(request, data_product_uri) + if _is_remote_api(): + _call_remote_api(request, + "/data-products/", + params={'product-uri': data_product.productUri}, + method="put", + data={"fileContentText": fileContentText}, + ) + return + storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product) + update_file_content(request, path, fileContentText, storage_resource_id=storage_resource_id) + + +def get_file_metadata(request, path, storage_resource_id=None): + if _is_remote_api(): + resp = _call_remote_api(request, + "/user-storage/~/{path}", + path_params={"path": path}, + raise_for_status=False + ) + _raise_404(resp, "User storage file path does not exist") + data = resp.json() + if data["isDir"]: + raise Exception("User storage path is a directory, not a file") + file = data['files'][0] + file['created_time'] = convert_iso8601_to_datetime(file['createdTime']) + file['mime_type'] = file['mimeType'] + file['data-product-uri'] = file['dataProductURI'] + return file + + backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id) + if backend.exists(path) and backend.is_file(path): + _, files = backend.get_metadata(path) + file = files[0] + data_product_uri = _get_data_product_uri(request, file['resource_path'], + storage_resource_id=backend.resource_id) + + data_product = request.airavata_client.getDataProduct( + request.authz_token, data_product_uri) + mime_type = None + if 'mime-type' in data_product.productMetadata: + mime_type = data_product.productMetadata['mime-type'] + file['data-product-uri'] = data_product_uri + file['mime_type'] = mime_type + return file + else: + raise ObjectDoesNotExist("File does not exist at that path.") + + +def get_file(request, path, storage_resource_id=None): + warnings.warn("Use 'get_file_metadata' instead.", DeprecationWarning) + return get_file_metadata(request, path, storage_resource_id) + + +def delete(request, data_product=None, data_product_uri=None): + """ + Delete replica for data product in this data store. One of `data_product` + or `data_product_uri` is required. + """ + if data_product is None: + data_product = _get_data_product(request, data_product_uri) + if _is_remote_api(): + _call_remote_api( + request, + "/delete-file", + params={'data-product-uri': data_product.productUri}, + method="delete") + return + else: + storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product) + backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id) + try: + backend.delete(path) + _delete_data_product(data_product.ownerName, path) + except Exception: + logger.exception( + "Unable to delete file {} for data product uri {}".format( + path, data_product.productUri + ) + ) + raise + + +def listdir(request, path, storage_resource_id=None): + """Return a tuple of two lists, one for directories, the second for files.""" + + if _is_remote_api(): + resp = _call_remote_api(request, + "/user-storage/~/{path}", + path_params={"path": path}, + ) + data = resp.json() + for directory in data['directories']: + # Convert JSON ISO8601 timestamp to datetime instance + directory['created_time'] = convert_iso8601_to_datetime( + directory['createdTime']) + for file in data['files']: + # Convert JSON ISO8601 timestamp to datetime instance + file['created_time'] = convert_iso8601_to_datetime( + file['createdTime']) + file['mime_type'] = file['mimeType'] + file['data-product-uri'] = file['dataProductURI'] + return data['directories'], data['files'] + + backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id) + directories, files = backend.get_metadata(path) + # for each file, lookup or register a data product and enrich the file + # metadata with data-product-uri and mime-type + for file in files: + data_product_uri = _get_data_product_uri(request, file['resource_path'], + storage_resource_id=backend.resource_id) + + data_product = request.airavata_client.getDataProduct( + request.authz_token, data_product_uri) + mime_type = None + if 'mime-type' in data_product.productMetadata: + mime_type = data_product.productMetadata['mime-type'] + file['data-product-uri'] = data_product_uri + file['mime_type'] = mime_type + return directories, files + + +def list_experiment_dir(request, experiment_id, path="", storage_resource_id=None): + """ + List files, directories in experiment data directory. Returns a tuple, + see `listdir`. + """ + if _is_remote_api(): + resp = _call_remote_api(request, + "/experiment-storage/{experiment_id}/{path}", + path_params={"path": path, + "experiment_id": experiment_id}, + ) + data = resp.json() + for directory in data['directories']: + # Convert JSON ISO8601 timestamp to datetime instance + directory['created_time'] = convert_iso8601_to_datetime( + directory['createdTime']) + for file in data['files']: + # Convert JSON ISO8601 timestamp to datetime instance + file['created_time'] = convert_iso8601_to_datetime( + file['createdTime']) + file['mime_type'] = file['mimeType'] + file['data-product-uri'] = file['dataProductURI'] + return data['directories'], data['files'] + + experiment = request.airavata_client.getExperiment( + request.authz_token, experiment_id) + backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id) + exp_data_path = experiment.userConfigurationData.experimentDataDir + exp_data_path = os.path.join(exp_data_path, path) + # Implement username override with exp_owner + # exp_owner = experiment.userName + if backend.exists(exp_data_path): + directories, files = backend.get_metadata(exp_data_path) + # for each file, lookup or register a data product and enrich the file + # metadata with data-product-uri and mime-type + for file in files: + data_product_uri = _get_data_product_uri(request, file['resource_path'], + storage_resource_id=backend.resource_id) + + data_product = request.airavata_client.getDataProduct( + request.authz_token, data_product_uri) + mime_type = None + if 'mime-type' in data_product.productMetadata: + mime_type = data_product.productMetadata['mime-type'] + file['data-product-uri'] = data_product_uri + file['mime_type'] = mime_type + return directories, files + else: + raise ObjectDoesNotExist("Experiment data directory does not exist") + + +def experiment_dir_exists(request, experiment_id, path="", storage_resource_id=None): + + if _is_remote_api(): + resp = _call_remote_api(request, + "/experiment-storage/{experiment_id}/{path}", + path_params={"path": path, + "experiment_id": experiment_id}, + raise_for_status=False) + if resp.status_code == HTTPStatus.NOT_FOUND: + return False + resp.raise_for_status() + return resp.json()['isDir'] + + experiment = request.airavata_client.getExperiment( + request.authz_token, experiment_id) + exp_data_path = experiment.userConfigurationData.experimentDataDir + if exp_data_path is None: + return False + # Implement username overide with exp_owner + # exp_owner = experiment.userName + backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id) + return backend.exists(exp_data_path) + + +def get_experiment_dir(request, project_name=None, experiment_name=None, path=None, storage_resource_id=None): + warnings.warn("Use 'create_user_dir' instead.", DeprecationWarning) + storage_resource_id, resource_path = create_user_dir(request, + dir_names=[project_name, experiment_name], + create_unique=True, + storage_resource_id=storage_resource_id) + return resource_path + + +def create_user_dir(request, path="", dir_names=(), create_unique=False, storage_resource_id=None): + if _is_remote_api(): + logger.debug(f"path={path}") + _call_remote_api(request, + "/user-storage/~/{path}", + path_params={"path": path}, + method="post") + return + backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id) + # For backwards compatibility, manufacture the dir_names array as needed + if len(dir_names) == 0: + dir_names = [] + while not backend.exists(path): + path, dir_name = os.path.split(path) + if dir_name == '': + raise Exception("Could not find a base directory in which to create directories.") + dir_names.insert(0, dir_name) + storage_resource_id, resource_path = backend.create_dirs(path, dir_names=dir_names, create_unique=create_unique) + return storage_resource_id, resource_path + + +def get_rel_experiment_dir(request, experiment_id, storage_resource_id=None): + """Return experiment data dir path relative to user's directory.""" + warnings.warn("Use 'list_experiment_dir' instead.", DeprecationWarning) + if _is_remote_api(): + resp = _call_remote_api(request, + "/experiments/{experimentId}/", + path_params={"experimentId": experiment_id}) + resp.raise_for_status() + return resp.json()['relativeExperimentDataDir'] + + experiment = request.airavata_client.getExperiment( + request.authz_token, experiment_id) + if (experiment.userConfigurationData and + experiment.userConfigurationData.experimentDataDir): + backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id) + data_dir = experiment.userConfigurationData.experimentDataDir + if backend.exists(data_dir): + directories, _ = backend.get_metadata(os.path.dirname(data_dir)) + for directory in directories: + if directory['name'] == os.path.basename(data_dir): + return directory['path'] + raise Exception(f"Could not find relative path to experiment data dir {data_dir}") + else: + return None + else: + return None + + +def _get_data_product_uri(request, full_path, storage_resource_id, owner=None): + + from airavata_django_portal_sdk import models + if owner is None: + owner = request.user.username + user_file = models.UserFiles.objects.filter( + username=owner, file_path=full_path) + if user_file.exists(): + product_uri = user_file[0].file_dpu + else: + data_product = _save_data_product(request, full_path, storage_resource_id, owner=owner) + product_uri = data_product.productUri + return product_uri + + +def _get_data_product(request, data_product_uri): + return request.airavata_client.getDataProduct( + request.authz_token, data_product_uri) + + +def _save_data_product(request, full_path, storage_resource_id, name=None, content_type=None, owner=None): + "Create, register and record in DB a data product for full_path." + if owner is None: + owner = request.user.username + data_product = _create_data_product( + owner, full_path, storage_resource_id, name=name, content_type=content_type + ) + product_uri = _register_data_product(request, full_path, data_product, owner=owner) + data_product.productUri = product_uri + return data_product + + +def _register_data_product(request, full_path, data_product, owner=None): + if owner is None: + owner = request.user.username + product_uri = request.airavata_client.registerDataProduct( + request.authz_token, data_product + ) + from airavata_django_portal_sdk import models + user_file_instance = models.UserFiles( + username=owner, + file_path=full_path, + file_dpu=product_uri) + user_file_instance.save() + return product_uri + + +def _save_copy_of_data_product(request, full_path, data_product, storage_resource_id): + """Save copy of a data product with a different path.""" + data_product_copy = _copy_data_product(request, data_product, full_path, storage_resource_id) + product_uri = _register_data_product(request, full_path, data_product_copy) + data_product_copy.productUri = product_uri + return data_product_copy + + +def _copy_data_product(request, data_product, full_path, storage_resource_id): + """Create an unsaved copy of a data product with different path.""" + data_product_copy = copy.copy(data_product) + data_product_copy.productUri = None + data_product_copy.ownerName = request.user.username + data_replica_location = _create_replica_location( + full_path, data_product_copy.productName, storage_resource_id + ) + data_product_copy.replicaLocations = [data_replica_location] + return data_product_copy + + +def _delete_data_product(username, full_path): + # TODO: call API to delete data product from replica catalog when it is + # available (not currently implemented) + from airavata_django_portal_sdk import models + user_file = models.UserFiles.objects.filter( + username=username, file_path=full_path) + if user_file.exists(): + user_file.delete() + + +def _create_data_product(username, full_path, storage_resource_id, name=None, content_type=None): + data_product = DataProductModel() + data_product.gatewayId = settings.GATEWAY_ID + data_product.ownerName = username + if name is not None: + file_name = name + else: + file_name = os.path.basename(full_path) + data_product.productName = file_name + data_product.dataProductType = DataProductType.FILE + final_content_type = _determine_content_type(full_path, content_type) + if final_content_type is not None: + data_product.productMetadata = {"mime-type": final_content_type} + data_replica_location = _create_replica_location(full_path, file_name, storage_resource_id) + data_product.replicaLocations = [data_replica_location] + return data_product + + +def _determine_content_type(full_path, content_type=None): + result = content_type + if result is None: + # Try to guess the content-type from file extension + guessed_type, encoding = mimetypes.guess_type(full_path) + result = guessed_type + if result is None or result == "application/octet-stream": + # Check if file is Unicode text by trying to read some of it + try: + open(full_path, "r").read(1024) + result = "text/plain" + except UnicodeDecodeError: + logger.debug(f"Failed to read as Unicode text: {full_path}") + return result + + +def _create_replica_location(full_path, file_name, storage_resource_id): + data_replica_location = DataReplicaLocationModel() + data_replica_location.storageResourceId = storage_resource_id + data_replica_location.replicaName = "{} gateway data store copy".format( + file_name) + data_replica_location.replicaLocationCategory = ( + ReplicaLocationCategory.GATEWAY_DATA_STORE + ) + data_replica_location.replicaPersistentType = ReplicaPersistentType.TRANSIENT + data_replica_location.filePath = quote(full_path) + return data_replica_location + + +def _get_replica_filepath(data_product): + replica_filepaths = [ + rep.filePath + for rep in data_product.replicaLocations + if rep.replicaLocationCategory == ReplicaLocationCategory.GATEWAY_DATA_STORE + ] + replica_filepath = replica_filepaths[0] if len( + replica_filepaths) > 0 else None + if replica_filepath: + return unquote(urlparse(replica_filepath).path) + return None + + +def _get_replica_location(data_product, category=ReplicaLocationCategory.GATEWAY_DATA_STORE): + replica_locations = [ + rep + for rep in data_product.replicaLocations + if rep.replicaLocationCategory == ReplicaLocationCategory.GATEWAY_DATA_STORE + ] + return replica_locations[0] if len(replica_locations) > 0 else None + + +def _get_replica_resource_id_and_filepath(data_product): + replica_location = _get_replica_location(data_product) + if replica_location is not None: + return (replica_location.storageResourceId, + unquote(urlparse(replica_location.filePath).path)) + else: + return None + + +def _is_remote_api(): + return getattr(settings, 'GATEWAY_DATA_STORE_REMOTE_API', None) is not None + + +def _call_remote_api( + request, + path, + path_params=None, + method="get", + raise_for_status=True, + **kwargs): + + headers = { + 'Authorization': f'Bearer {request.authz_token.accessToken}'} + encoded_path_params = {} + if path_params is not None: + for pk, pv in path_params.items(): + encoded_path_params[pk] = quote(pv) + encoded_path = path.format(**encoded_path_params) + logger.debug(f"encoded_path={encoded_path}") + r = requests.request( + method, + f'{settings.GATEWAY_DATA_STORE_REMOTE_API}{encoded_path}', + headers=headers, + **kwargs, + ) + if raise_for_status: + r.raise_for_status() + return r + + +def _raise_404(response, msg, exception_class=ObjectDoesNotExist): + if response.status_code == 404: + raise exception_class(msg) diff --git a/airavata_django_portal_sdk/user_storage/backends/__init__.py b/airavata_django_portal_sdk/user_storage/backends/__init__.py new file mode 100644 index 0000000..eefe980 --- /dev/null +++ b/airavata_django_portal_sdk/user_storage/backends/__init__.py @@ -0,0 +1,3 @@ +from .django_filesystem_provider import DjangoFileSystemProvider + +__all__ = ['DjangoFileSystemProvider'] diff --git a/airavata_django_portal_sdk/user_storage/backends/base.py b/airavata_django_portal_sdk/user_storage/backends/base.py new file mode 100644 index 0000000..409a794 --- /dev/null +++ b/airavata_django_portal_sdk/user_storage/backends/base.py @@ -0,0 +1,64 @@ + +class UserStorageProvider: + def __init__(self, authz_token, resource_id, context=None, **kwargs): + self.authz_token = authz_token + self.resource_id = resource_id + # TODO probably don't need context for passing 'request' + self.context = context + + # TODO remove content_type + def save(self, resource_path, file, name=None, content_type=None): + """ + Return a tuple of storage resource id and path, if any, to file. + """ + raise NotImplementedError() + + def get_upload_url(self, resource_path): + raise NotImplementedError() + + def open(self, resource_path): + raise NotImplementedError() + + def get_download_url(self, resource_path): + raise NotImplementedError() + + def exists(self, resource_path): + raise NotImplementedError() + + def is_file(self, resource_path): + # TODO: is this needed if we have get_metadata? + raise NotImplementedError() + + def is_dir(self, resource_path): + # TODO: is this needed if we have get_metadata? + raise NotImplementedError() + + def get_metadata(self, resource_path): + """ + Return a tuple of two sequences: directories and files for the given + resource_path. If the resource_path represents a file, then the + directories sequence should be empty and the files sequence will only + have the one file. + """ + raise NotImplementedError() + + def delete(self, resource_path): + raise NotImplementedError() + + def update(self, resource_path, file): + raise NotImplementedError() + + def create_dirs(self, resource_path, dir_names=[], create_unique=False): + """ + Create one or more named subdirectories inside the resource_path. + resource_path must exist. dir_names will potentially be normalized as + needed. The intermediate directories may already exist, but if the + final directory already exists, this method will raise an Exception, + unless create_unique is True in which the name will be modified until + a unique directory name is found. + """ + raise NotImplementedError() + + @property + def username(self): + return self.authz_token.claimsMap['userName'] diff --git a/airavata_django_portal_sdk/user_storage/backends/django_filesystem_provider.py b/airavata_django_portal_sdk/user_storage/backends/django_filesystem_provider.py new file mode 100644 index 0000000..78812bd --- /dev/null +++ b/airavata_django_portal_sdk/user_storage/backends/django_filesystem_provider.py @@ -0,0 +1,306 @@ +import logging +import os +import shutil + +from django.core.exceptions import ObjectDoesNotExist, SuspiciousFileOperation +from django.core.files.storage import FileSystemStorage + +from .base import UserStorageProvider +from django.core.files import File + +logger = logging.getLogger(__name__) + +TMP_INPUT_FILE_UPLOAD_DIR = "tmp" + + +class DjangoFileSystemProvider(UserStorageProvider): + def __init__(self, authz_token, resource_id, context=None, directory=None, storage_resource_id=None, **kwargs): + super().__init__(authz_token, resource_id, context=context, **kwargs) + self.directory = directory + self.storage_resource_id = resource_id + + def save(self, path, file, name=None, content_type=None): + full_path = self.datastore.save(path, file, name=name) + return self.storage_resource_id, full_path + + def get_upload_url(self, resource_path): + # TODO: implement + return super().get_upload_url(resource_path) + + def open(self, resource_path): + return self.datastore.open(resource_path) + + def get_download_url(self, resource_path): + # TODO: implement + return super().get_download_url(resource_path) + + def exists(self, resource_path): + return self.datastore.exists(resource_path) + + def is_file(self, resource_path): + return self.datastore.file_exists(resource_path) + + def is_dir(self, resource_path): + return self.datastore.dir_exists(resource_path) + + def get_metadata(self, resource_path): + # TODO: also return an isDir boolean flag? + datastore = self.datastore + if datastore.dir_exists(resource_path): + directories, files = datastore.list_user_dir( + resource_path) + directories_data = [] + for d in directories: + dpath = os.path.join(resource_path, d) + created_time = datastore.get_created_time(dpath) + size = datastore.size(dpath) + directories_data.append( + { + "name": d, + "path": datastore.rel_path(dpath), + "created_time": created_time, + "size": size, + "hidden": dpath == TMP_INPUT_FILE_UPLOAD_DIR, + } + ) + files_data = [] + for f in files: + user_rel_path = os.path.join(resource_path, f) + if not datastore.exists(user_rel_path): + logger.warning(f"listdir skipping {user_rel_path}, " + "does not exist (broken symlink?)") + continue + created_time = datastore.get_created_time(user_rel_path) + size = datastore.size(user_rel_path) + full_path = datastore.path(user_rel_path) + files_data.append( + { + "name": f, + "path": datastore.rel_path(full_path), + "resource_path": full_path, + "created_time": created_time, + "size": size, + "hidden": False, + } + ) + return directories_data, files_data + elif datastore.exists(resource_path): + + created_time = datastore.get_created_time(resource_path) + size = datastore.size(resource_path) + full_path = datastore.path(resource_path) + return [], [ + { + "name": os.path.basename(resource_path), + "path": datastore.rel_path(full_path), + "resource_path": full_path, + "created_time": created_time, + "size": size, + "hidden": False, + } + ] + else: + raise ObjectDoesNotExist(f"User storage path does not exist {resource_path}") + + def delete(self, resource_path): + if self.datastore.file_exists(resource_path): + self.datastore.delete(resource_path) + elif self.datastore.dir_exists(resource_path): + self.datastore.delete_dir(resource_path) + else: + raise ObjectDoesNotExist(f"User resource_path does not exist {resource_path}") + + def update(self, resource_path, file): + full_path = self.datastore.path(resource_path) + with open(full_path, 'w') as f: + f.write(file.read()) + + def create_dirs(self, resource_path, dir_names=[], create_unique=False): + datastore = self.datastore + if not datastore.exists(resource_path): + raise ObjectDoesNotExist(f"User resource_path does not exist {resource_path}") + valid_dir_names = [] + for dir_name in dir_names: + valid_dir_names.append(datastore.get_valid_name(dir_name)) + final_path = os.path.join(resource_path, *valid_dir_names) + if datastore.exists(final_path) and not create_unique: + raise Exception(f"Directory {final_path} already exists") + # Make sure path is unique if it already exists + final_path = datastore.get_available_name(final_path) + datastore.create_user_dir(final_path) + return self.storage_resource_id, final_path + + @property + def datastore(self): + directory = os.path.join(self.directory, self.username) + owner_username = self.context.get('owner_username') + # When the current user isn't the owner, set the directory based on the owner's username + if owner_username: + directory = os.path.join(self.directory, owner_username) + return _Datastore(directory=directory) + + +class _Datastore: + """Internal datastore abstraction.""" + + def __init__(self, directory=None): + self.directory = directory + self.storage = self._user_data_storage(self.directory) + + def exists(self, path): + """Check if path exists in this data store.""" + try: + return self.storage.exists(path) + except SuspiciousFileOperation as e: + logger.warning(f"Invalid path: {e}") + return False + + def file_exists(self, path): + """Check if file path exists in this data store.""" + try: + return self.storage.exists(path) and os.path.isfile(self.path(path)) + except SuspiciousFileOperation as e: + logger.warning(f"Invalid path: {e}") + return False + + def dir_exists(self, path): + """Check if directory path exists in this data store.""" + logger.debug(f"dir_exists: {path}, {self.path(path)}") + try: + return self.storage.exists(path) and os.path.isdir(self.path(path)) + except SuspiciousFileOperation as e: + logger.warning(f"Invalid path: {e}") + return False + + def open(self, path): + """Open path for user if it exists in this data store.""" + if self.exists(path): + return self.storage.open(path) + else: + raise ObjectDoesNotExist( + "File path does not exist: {}".format(path)) + + def save(self, path, file, name=None): + """Save file to username/path in data store.""" + # file.name may be full path, so get just the name of the file + file_name = name if name is not None else os.path.basename(file.name) + user_data_storage = self.storage + file_path = os.path.join( + path, user_data_storage.get_valid_name(file_name)) + input_file_name = user_data_storage.save(file_path, file) + input_file_fullpath = user_data_storage.path(input_file_name) + return input_file_fullpath + + def create_user_dir(self, path): + user_data_storage = self.storage + if not user_data_storage.exists(path): + self._makedirs(path) + else: + raise Exception("Directory {} already exists".format(path)) + + def delete(self, path): + """Delete file in this data store.""" + if self.file_exists(path): + user_data_storage = self.storage + user_data_storage.delete(path) + else: + raise ObjectDoesNotExist( + "File path does not exist: {}".format(path)) + + def delete_dir(self, path): + """Delete entire directory in this data store.""" + if self.dir_exists(path): + user_path = self.path(path) + shutil.rmtree(user_path) + else: + raise ObjectDoesNotExist( + "File path does not exist: {}".format(path)) + + def get_experiment_dir( + self, project_name=None, experiment_name=None, path=None + ): + """Return an experiment directory (full path) for the given experiment.""" + user_experiment_data_storage = self.storage + if path is None: + proj_dir_name = user_experiment_data_storage.get_valid_name( + project_name) + # AIRAVATA-3245 Make project directory with correct permissions + if not user_experiment_data_storage.exists(proj_dir_name): + self._makedirs(proj_dir_name) + experiment_dir_name = os.path.join( + proj_dir_name, + user_experiment_data_storage.get_valid_name(experiment_name), + ) + # Since there may already be another experiment with the same name in + # this project, we need to check for available name + experiment_dir_name = user_experiment_data_storage.get_available_name( + experiment_dir_name) + experiment_dir = user_experiment_data_storage.path( + experiment_dir_name) + else: + # path can be relative to the user's storage space or absolute (as long + # as it is still inside the user's storage space) + # if path is passed in, assumption is that it has already been + # created + user_experiment_data_storage = self.storage + experiment_dir = user_experiment_data_storage.path(path) + if not user_experiment_data_storage.exists(experiment_dir): + self._makedirs(experiment_dir) + return experiment_dir + + def get_valid_name(self, name): + return self.storage.get_valid_name(name) + + def get_available_name(self, name): + return self.storage.get_available_name(name) + + def _makedirs(self, dir_path): + user_experiment_data_storage = self.storage + full_path = user_experiment_data_storage.path(dir_path) + os.makedirs( + full_path, + mode=user_experiment_data_storage.directory_permissions_mode) + # os.makedirs mode isn't always respected so need to chmod to be sure + os.chmod( + full_path, + mode=user_experiment_data_storage.directory_permissions_mode) + + def list_user_dir(self, file_path): + logger.debug("file_path={}".format(file_path)) + user_data_storage = self.storage + return user_data_storage.listdir(file_path) + + def get_created_time(self, file_path): + user_data_storage = self.storage + return user_data_storage.get_created_time(file_path) + + def size(self, file_path): + user_data_storage = self.storage + full_path = self.path(file_path) + if os.path.isdir(full_path): + return self._get_dir_size(full_path) + else: + return user_data_storage.size(file_path) + + def path(self, file_path): + user_data_storage = self.storage + return user_data_storage.path(file_path) + + def rel_path(self, file_path): + full_path = self.path(file_path) + return os.path.relpath(full_path, self.path("")) + + def _user_data_storage(self, directory): + return FileSystemStorage(location=directory) + + # from https://stackoverflow.com/a/1392549 + def _get_dir_size(self, start_path="."): + total_size = 0 + for dirpath, dirnames, filenames in os.walk(start_path): + for f in filenames: + fp = os.path.join(dirpath, f) + # Check for broken symlinks (.exists return False for broken + # symlinks) + if os.path.exists(fp): + total_size += os.path.getsize(fp) + return total_size diff --git a/airavata_django_portal_sdk/user_storage/backends/mft_provider.py b/airavata_django_portal_sdk/user_storage/backends/mft_provider.py new file mode 100644 index 0000000..6678572 --- /dev/null +++ b/airavata_django_portal_sdk/user_storage/backends/mft_provider.py @@ -0,0 +1,152 @@ +from .base import UserStorageProvider + + +class MFTUserStorageProvider(UserStorageProvider): + + def exists(self, resource_path): + return super().exists(resource_path) +# with grpc.insecure_channel('localhost:7004') as channel: +# # remove trailing slash and figure out parent path +# # FIXME remove the hard coded /tmp path +# parent_path, child_path = os.path.split(f"/tmp/{path}".rstrip("/")) +# logger.debug(f"parent_path={parent_path}, child_path={child_path}") +# stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel) +# # Get metadata for parent directory and see if child_path exists +# request = MFTApi_pb2.FetchResourceMetadataRequest( +# resourceId="remote-ssh-dir-resource", +# resourceType="SCP", +# resourceToken="local-ssh-cred", +# resourceBackend="FILE", +# resourceCredentialBackend="FILE", +# targetAgentId="agent0", +# childPath=parent_path, +# mftAuthorizationToken="user token") +# response = stub.getDirectoryResourceMetadata(request) +# # if not child_path, then return True since the response was +# # successful and we just need to confirm the existence of the root dir +# if child_path == '': +# return True +# return child_path in map(lambda f: f.friendlyName, response.directories) + + def get_metadata(self, resource_path): + return super().get_metadata(resource_path) +# def listdir(self, request, path): +# # TODO setup resourceId, etc from __init__ arguments +# channel = grpc.insecure_channel('localhost:7004') +# stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel) +# request = MFTApi_pb2.FetchResourceMetadataRequest( +# resourceId="remote-ssh-dir-resource", +# resourceType="SCP", +# resourceToken="local-ssh-cred", +# resourceBackend="FILE", +# resourceCredentialBackend="FILE", +# targetAgentId="agent0", +# childPath=f"/tmp/{path}", +# mftAuthorizationToken="user token") +# response = stub.getDirectoryResourceMetadata(request) +# directories_data = [] +# for d in response.directories: + +# dpath = os.path.join(path, d.friendlyName) +# created_time = datetime.fromtimestamp(d.createdTime) +# # TODO MFT API doesn't report size +# size = 0 +# directories_data.append( +# { +# "name": d.friendlyName, +# "path": dpath, +# "created_time": created_time, +# "size": size, +# # TODO how to handle hidden directories or directories for +# # staging input file uploads +# "hidden": False +# } +# ) +# files_data = [] +# for f in response.files: +# user_rel_path = os.path.join(path, f.friendlyName) +# # TODO do we need to check for broken symlinks? +# created_time = datetime.fromtimestamp(f.createdTime) +# # TODO get the size as well +# size = 0 +# # full_path = datastore.path(request.user.username, user_rel_path) +# # TODO how do we register these as data products, do we need to? +# # data_product_uri = _get_data_product_uri(request, full_path) + +# # data_product = request.airavata_client.getDataProduct( +# # request.authz_token, data_product_uri) +# # mime_type = None +# # if 'mime-type' in data_product.productMetadata: +# # mime_type = data_product.productMetadata['mime-type'] +# files_data.append( +# { +# "name": f.friendlyName, +# "path": user_rel_path, +# "data-product-uri": None, +# "created_time": created_time, +# "mime_type": None, +# "size": size, +# "hidden": False, +# } +# ) +# return directories_data, files_data + +# def get_file(self, request, path): +# # FIXME remove hard coded /tmp path +# path = f"/tmp/{path}".rstrip("/") +# file_metadata = self._get_file(path) +# if file_metadata is not None: +# user_rel_path = os.path.join(path, file_metadata.friendlyName) +# created_time = datetime.fromtimestamp(file_metadata.createdTime) +# # TODO get the size as well +# size = 0 + +# return { +# "name": file_metadata.friendlyName, +# "path": user_rel_path, +# "data-product-uri": None, +# "created_time": created_time, +# "mime_type": None, +# "size": size, +# "hidden": False, +# } +# else: +# raise ObjectDoesNotExist("User storage file path does not exist") + +# def _get_file(self, path): +# with grpc.insecure_channel('localhost:7004') as channel: +# stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel) +# # Get metadata for parent directory and see if child_path exists +# request = MFTApi_pb2.FetchResourceMetadataRequest( +# resourceId="remote-ssh-dir-resource", +# resourceType="SCP", +# resourceToken="local-ssh-cred", +# resourceBackend="FILE", +# resourceCredentialBackend="FILE", +# targetAgentId="agent0", +# childPath=path, +# mftAuthorizationToken="user token") +# try: +# # TODO is there a better way to check if file exists than catching exception? +# return stub.getFileResourceMetadata(request) +# except Exception: +# logger.exception(f"_get_file({path})") +# return None + +# def _get_download_url(self, path): + +# with grpc.insecure_channel('localhost:7004') as channel: +# stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel) +# download_request = MFTApi_pb2.HttpDownloadApiRequest(sourceStoreId="remote-ssh-storage", +# sourcePath="/tmp/a.txt", +# sourceToken="local-ssh-cred", +# sourceType="SCP", +# targetAgent="agent0", +# mftAuthorizationToken="") +# try: +# # TODO is there a better way to check if file exists than catching exception? +# # response stub.submitHttpDownload(request) +# pass +# except Exception: +# logger.exception(f"_get_file({path})") +# return None diff --git a/airavata_django_portal_sdk/user_storage/backends/remote_api_provider.py b/airavata_django_portal_sdk/user_storage/backends/remote_api_provider.py new file mode 100644 index 0000000..e69de29 diff --git a/airavata_django_portal_sdk/user_storage_provider.py b/airavata_django_portal_sdk/user_storage_provider.py index cc214db..b6b6842 100644 --- a/airavata_django_portal_sdk/user_storage_provider.py +++ b/airavata_django_portal_sdk/user_storage_provider.py @@ -13,8 +13,9 @@ logger = logging.getLogger(__name__) class UserStorageProvider: - def __init__(self, authz_token, *args, **kwargs): + def __init__(self, authz_token, context=None, *args, **kwargs): self.authz_token = authz_token + self.context = context def save(self, authz_token, path, file, name=None, content_type=None): raise NotImplementedError()
