Repository: incubator-airflow Updated Branches: refs/heads/master 27835f4b8 -> 803767959
[AIRFLOW-2184][AIRFLOW-2138] Google Cloud Storage allow wildcards - closes #2184 - Add support for wildcards to be provided in source object argument This allows the user of the Operator to provide a wildcard in the format accepted by the documentation. This message is echoed in the docstring for ease of use, and also because it is only three sentences and adding a link is not required. - Add an argument move_object (bool) to the operator that, when true runs a mv command as opposed to a cp command. That is to say, it moves an object instead of copying the object. This is especially useful when this operator is used to move objects in the same bucket, perhaps from folder to folder. - Add dotmodus and dannylee12 to companies using airflow We use airflow in almost all of our projects. -Unit tests written for the 3 use cases of the added operator. Remove newline Split too long line over 2 lines. Closes #3067 from DannyLee12/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/80376795 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/80376795 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/80376795 Branch: refs/heads/master Commit: 803767959e27ea2f8424fd5afa40f1bab9f15648 Parents: 27835f4 Author: Daniel Lee <dan...@dotmodus.com> Authored: Fri Mar 9 22:21:17 2018 +0100 Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com> Committed: Fri Mar 9 22:22:03 2018 +0100 ---------------------------------------------------------------------- README.md | 1 + airflow/contrib/operators/gcs_to_gcs.py | 70 ++++++++++++++--- .../operators/test_gcs_to_gcs_operator.py | 80 ++++++++++++++++++++ 3 files changed, 140 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/80376795/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index bc52bcc..dd0acd5 100644 --- a/README.md +++ b/README.md @@ -227,6 +227,7 @@ Currently **officially** using Airflow: 1. [Zenly](https://zen.ly) [[@cerisier](https://github.com/cerisier) & [@jbdalido](https://github.com/jbdalido)] 1. [Zymergen](https://www.zymergen.com/) 1. [99](https://99taxis.com) [[@fbenevides](https://github.com/fbenevides), [@gustavoamigo](https://github.com/gustavoamigo) & [@mmmaia](https://github.com/mmmaia)] +1. [dotmodus](http://dotmodus.com) [@dannylee12](https://github.com/dannylee12) ## Links http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/80376795/airflow/contrib/operators/gcs_to_gcs.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_to_gcs.py b/airflow/contrib/operators/gcs_to_gcs.py index 1b340fe..8db3067 100644 --- a/airflow/contrib/operators/gcs_to_gcs.py +++ b/airflow/contrib/operators/gcs_to_gcs.py @@ -25,20 +25,37 @@ class GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator): :type source_bucket: string :param source_object: The source name of the object to copy in the Google cloud storage bucket. + If wildcards are used in this argument: + You can use only one wildcard for objects (filenames) within your + bucket. The wildcard can appear inside the object name or at the + end of the object name. Appending a wildcard to the bucket name is + unsupported. :type source_object: string - :param destination_bucket: The destination Google cloud storage bucket where the object should be. + :param destination_bucket: The destination Google cloud storage bucket + where the object should be. :type destination_bucket: string - :param destination_object: The destination name of the object in the destination Google cloud + :param destination_object: The destination name of the object in the + destination Google cloud storage bucket. + If a wildcard is supplied in the source_object argument, this is the + folder that the files will be + copied to in the destination bucket. :type destination_object: string + :param move_object: When move object is True, the object is moved instead + of copied to the new location. + This is the equivalent of a mv command as opposed to a + cp command. + :type move_object: bool :param google_cloud_storage_conn_id: The connection ID to use when connecting to Google cloud storage. :type google_cloud_storage_conn_id: string :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have domain-wide delegation enabled. + For this to work, the service account making the request must have + domain-wide delegation enabled. :type delegate_to: string """ - template_fields = ('source_bucket', 'source_object', 'destination_bucket', 'destination_object',) + template_fields = ('source_bucket', 'source_object', 'destination_bucket', + 'destination_object',) ui_color = '#f0eee4' @apply_defaults @@ -47,22 +64,53 @@ class GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator): source_object, destination_bucket=None, destination_object=None, + move_object=False, google_cloud_storage_conn_id='google_cloud_storage_default', delegate_to=None, *args, **kwargs): - super(GoogleCloudStorageToGoogleCloudStorageOperator, self).__init__(*args, **kwargs) + super(GoogleCloudStorageToGoogleCloudStorageOperator, self).__init__( + *args, **kwargs) self.source_bucket = source_bucket self.source_object = source_object self.destination_bucket = destination_bucket self.destination_object = destination_object + self.move_object = move_object self.google_cloud_storage_conn_id = google_cloud_storage_conn_id self.delegate_to = delegate_to def execute(self, context): - self.log.info('Executing copy: %s, %s, %s, %s', self.source_bucket, self.source_object, - self.destination_bucket or self.source_bucket, - self.destination_object or self.source_object) - hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, - delegate_to=self.delegate_to) - hook.copy(self.source_bucket, self.source_object, self.destination_bucket, self.destination_object) + + hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, + delegate_to=self.delegate_to + ) + + if '*' in self.source_object: + wildcard_position = self.source_object.index('*') + objects = hook.list(self.source_bucket, + prefix=self.source_object[:wildcard_position], + delimiter=self.source_object[wildcard_position + 1:]) + for source_object in objects: + self.log.info('Executing copy of gs://{0}/{1} to ' + 'gs://{2}/{3}/{1}'.format(self.source_bucket, + source_object, + self.destination_bucket, + self.destination_object, + source_object)) + hook.copy(self.source_bucket, source_object, + self.destination_bucket, "{}/{}".format(self.destination_object, + source_object)) + if self.move_object: + hook.delete(self.source_bucket, source_object) + + else: + self.log.info('Executing copy: %s, %s, %s, %s', self.source_bucket, + self.source_object, + self.destination_bucket or self.source_bucket, + self.destination_object or self.source_object) + hook.copy(self.source_bucket, self.source_object, + self.destination_bucket, self.destination_object) + + if self.move_object: + hook.delete(self.source_bucket, self.source_object) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/80376795/tests/contrib/operators/test_gcs_to_gcs_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_gcs_to_gcs_operator.py b/tests/contrib/operators/test_gcs_to_gcs_operator.py new file mode 100644 index 0000000..0415305 --- /dev/null +++ b/tests/contrib/operators/test_gcs_to_gcs_operator.py @@ -0,0 +1,80 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from airflow.contrib.operators.gcs_to_gcs import \ + GoogleCloudStorageToGoogleCloudStorageOperator + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + +TASK_ID = 'test-gcs-to-gcs-operator' +TEST_BUCKET = 'test-bucket' +DELIMITER = '.csv' +PREFIX = 'TEST' +SOURCE_OBJECT_1 = '*test_object' +SOURCE_OBJECT_2 = 'test_object*' +SOURCE_OBJECT_3 = 'test*object' +DESTINATION_BUCKET = 'archive' + + +class GoogleCloudStorageToCloudStorageOperatorTest(unittest.TestCase): + """ + Tests the three use-cases for the wildcard operator. These are + no_prefix: *test_object + no_suffix: test_object* + prefix_and_suffix: test*object + """ + + @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook') + def test_execute_no_prefix(self, mock_hook): + operator = GoogleCloudStorageToGoogleCloudStorageOperator( + task_id=TASK_ID, source_bucket=TEST_BUCKET, + source_object=SOURCE_OBJECT_1, + destination_bucket=DESTINATION_BUCKET) + + operator.execute(None) + mock_hook.return_value.list.assert_called_once_with( + TEST_BUCKET, prefix="", delimiter="test_object" + ) + + @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook') + def test_execute_no_suffix(self, mock_hook): + operator = GoogleCloudStorageToGoogleCloudStorageOperator( + task_id=TASK_ID, source_bucket=TEST_BUCKET, + source_object=SOURCE_OBJECT_2, + destination_bucket=DESTINATION_BUCKET) + + operator.execute(None) + mock_hook.return_value.list.assert_called_once_with( + TEST_BUCKET, prefix="test_object", delimiter="" + ) + + @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook') + def test_execute_prefix_and_suffix(self, mock_hook): + operator = GoogleCloudStorageToGoogleCloudStorageOperator( + task_id=TASK_ID, source_bucket=TEST_BUCKET, + source_object=SOURCE_OBJECT_3, + destination_bucket=DESTINATION_BUCKET) + + operator.execute(None) + mock_hook.return_value.list.assert_called_once_with( + TEST_BUCKET, prefix="test", delimiter="object" + )