XD-DENG commented on a change in pull request #3823: [AIRFLOW-2985] Operators for S3 object copying/deleting URL: https://github.com/apache/incubator-airflow/pull/3823#discussion_r215821211
########## File path: airflow/hooks/S3_hook.py ########## @@ -384,3 +384,89 @@ def load_bytes(self, client = self.get_conn() client.upload_fileobj(filelike_buffer, bucket_name, key, ExtraArgs=extra_args) + + def copy_object(self, + source_bucket_key, + dest_bucket_key, + source_bucket_name=None, + dest_bucket_name=None, + source_version_id=None): + """ + Creates a copy of an object that is already stored in S3. + + Note: the S3 connection used here needs to have access to both + source and destination bucket/key. + + :param source_bucket_key: The key of the source object. + + It can be either full s3:// style url or relative path from root level. + + When it's specified as a full s3:// url, please omit source_bucket_name. + :type source_bucket_key: str + :param dest_bucket_key: The key of the object to copy to. + + The convention to specify `dest_bucket_key` is the same + as `source_bucket_key`. + :type dest_bucket_key: str + :param source_bucket_name: Name of the S3 bucket where the source object is in. + + It should be omitted when `source_bucket_key` is provided as a full s3:// url. + :type source_bucket_name: str + :param dest_bucket_name: Name of the S3 bucket to where the object is copied. + + It should be omitted when `dest_bucket_key` is provided as a full s3:// url. + :type dest_bucket_name: str + :param source_version_id: Version ID of the source object (OPTIONAL) + :type source_version_id: str + """ + + if dest_bucket_name is None: + dest_bucket_name, dest_bucket_key = self.parse_s3_url(dest_bucket_key) + else: + parsed_url = urlparse(dest_bucket_key) + if parsed_url.scheme != '' or parsed_url.netloc != '': + raise AirflowException('If dest_bucket_name is provided, ' + + 'dest_bucket_key should be relative path ' + + 'from root level, rather than a full s3:// url') + + if source_bucket_name is None: + source_bucket_name, source_bucket_key = self.parse_s3_url(source_bucket_key) + else: + parsed_url = urlparse(source_bucket_key) + if parsed_url.scheme != '' or parsed_url.netloc != '': + raise AirflowException('If source_bucket_name is provided, ' + + 'source_bucket_key should be relative path ' + + 'from root level, rather than a full s3:// url') + + CopySource = {'Bucket': source_bucket_name, + 'Key': source_bucket_key, + 'VersionId': source_version_id} + response = self.get_conn().copy_object(Bucket=dest_bucket_name, + Key=dest_bucket_key, + CopySource=CopySource) + return response Review comment: For both `copy_object()` and `delete_objects()`, I'm returning the response from boto3/S3 API directly. This can give users the flexibility if they want to dive deeper into the details in the response. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services