Repository: incubator-airflow
Updated Branches:
  refs/heads/master 27835f4b8 -> 803767959


[AIRFLOW-2184][AIRFLOW-2138] Google Cloud Storage allow wildcards

- closes #2184

- Add support for wildcards to be provided in
source object argument
This allows the user of the Operator to provide a
wildcard in the format accepted by the
documentation. This message is echoed in the
docstring for ease of use, and also because it is
only three sentences and adding a link is not
required.
- Add an argument move_object (bool) to the
operator that, when true runs a mv command as
opposed to a cp command. That is to say, it moves
an object instead of copying the object. This is
especially useful when this operator is used to
move objects in the same bucket, perhaps from
folder to folder.
- Add dotmodus and dannylee12 to companies using
airflow
We use airflow in almost all of our projects.

-Unit tests written for the 3 use cases of the
added operator.

Remove newline

Split too long line over 2 lines.

Closes #3067 from DannyLee12/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/80376795
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/80376795
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/80376795

Branch: refs/heads/master
Commit: 803767959e27ea2f8424fd5afa40f1bab9f15648
Parents: 27835f4
Author: Daniel Lee <dan...@dotmodus.com>
Authored: Fri Mar 9 22:21:17 2018 +0100
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Fri Mar 9 22:22:03 2018 +0100

----------------------------------------------------------------------
 README.md                                       |  1 +
 airflow/contrib/operators/gcs_to_gcs.py         | 70 ++++++++++++++---
 .../operators/test_gcs_to_gcs_operator.py       | 80 ++++++++++++++++++++
 3 files changed, 140 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/80376795/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index bc52bcc..dd0acd5 100644
--- a/README.md
+++ b/README.md
@@ -227,6 +227,7 @@ Currently **officially** using Airflow:
 1. [Zenly](https://zen.ly) [[@cerisier](https://github.com/cerisier) & 
[@jbdalido](https://github.com/jbdalido)]
 1. [Zymergen](https://www.zymergen.com/)
 1. [99](https://99taxis.com) [[@fbenevides](https://github.com/fbenevides), 
[@gustavoamigo](https://github.com/gustavoamigo) & 
[@mmmaia](https://github.com/mmmaia)]
+1. [dotmodus](http://dotmodus.com) [@dannylee12](https://github.com/dannylee12)
 
 ## Links
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/80376795/airflow/contrib/operators/gcs_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_gcs.py 
b/airflow/contrib/operators/gcs_to_gcs.py
index 1b340fe..8db3067 100644
--- a/airflow/contrib/operators/gcs_to_gcs.py
+++ b/airflow/contrib/operators/gcs_to_gcs.py
@@ -25,20 +25,37 @@ class 
GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator):
     :type source_bucket: string
     :param source_object: The source name of the object to copy in the Google 
cloud
         storage bucket.
+        If wildcards are used in this argument:
+            You can use only one wildcard for objects (filenames) within your
+            bucket. The wildcard can appear inside the object name or at the
+            end of the object name. Appending a wildcard to the bucket name is
+            unsupported.
     :type source_object: string
-    :param destination_bucket: The destination Google cloud storage bucket 
where the object should be.
+    :param destination_bucket: The destination Google cloud storage bucket
+    where the object should be.
     :type destination_bucket: string
-    :param destination_object: The destination name of the object in the 
destination Google cloud
+    :param destination_object: The destination name of the object in the
+    destination Google cloud
         storage bucket.
+        If a wildcard is supplied in the source_object argument, this is the
+        folder that the files will be
+        copied to in the destination bucket.
     :type destination_object: string
+    :param move_object: When move object is True, the object is moved instead
+    of copied to the new location.
+                        This is the equivalent of a mv command as opposed to a
+                        cp command.
+    :type move_object: bool
     :param google_cloud_storage_conn_id: The connection ID to use when
         connecting to Google cloud storage.
     :type google_cloud_storage_conn_id: string
     :param delegate_to: The account to impersonate, if any.
-        For this to work, the service account making the request must have 
domain-wide delegation enabled.
+        For this to work, the service account making the request must have
+        domain-wide delegation enabled.
     :type delegate_to: string
     """
-    template_fields = ('source_bucket', 'source_object', 'destination_bucket', 
'destination_object',)
+    template_fields = ('source_bucket', 'source_object', 'destination_bucket',
+                       'destination_object',)
     ui_color = '#f0eee4'
 
     @apply_defaults
@@ -47,22 +64,53 @@ class 
GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator):
                  source_object,
                  destination_bucket=None,
                  destination_object=None,
+                 move_object=False,
                  google_cloud_storage_conn_id='google_cloud_storage_default',
                  delegate_to=None,
                  *args,
                  **kwargs):
-        super(GoogleCloudStorageToGoogleCloudStorageOperator, 
self).__init__(*args, **kwargs)
+        super(GoogleCloudStorageToGoogleCloudStorageOperator, self).__init__(
+            *args, **kwargs)
         self.source_bucket = source_bucket
         self.source_object = source_object
         self.destination_bucket = destination_bucket
         self.destination_object = destination_object
+        self.move_object = move_object
         self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
         self.delegate_to = delegate_to
 
     def execute(self, context):
-        self.log.info('Executing copy: %s, %s, %s, %s', self.source_bucket, 
self.source_object,
-                      self.destination_bucket or self.source_bucket,
-                      self.destination_object or self.source_object)
-        hook = 
GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
-                                      delegate_to=self.delegate_to)
-        hook.copy(self.source_bucket, self.source_object, 
self.destination_bucket, self.destination_object)
+
+        hook = GoogleCloudStorageHook(
+            google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
+            delegate_to=self.delegate_to
+        )
+
+        if '*' in self.source_object:
+            wildcard_position = self.source_object.index('*')
+            objects = hook.list(self.source_bucket,
+                                prefix=self.source_object[:wildcard_position],
+                                delimiter=self.source_object[wildcard_position 
+ 1:])
+            for source_object in objects:
+                self.log.info('Executing copy of gs://{0}/{1} to '
+                              'gs://{2}/{3}/{1}'.format(self.source_bucket,
+                                                        source_object,
+                                                        
self.destination_bucket,
+                                                        
self.destination_object,
+                                                        source_object))
+                hook.copy(self.source_bucket, source_object,
+                          self.destination_bucket, 
"{}/{}".format(self.destination_object,
+                                                                  
source_object))
+                if self.move_object:
+                    hook.delete(self.source_bucket, source_object)
+
+        else:
+            self.log.info('Executing copy: %s, %s, %s, %s', self.source_bucket,
+                          self.source_object,
+                          self.destination_bucket or self.source_bucket,
+                          self.destination_object or self.source_object)
+            hook.copy(self.source_bucket, self.source_object,
+                      self.destination_bucket, self.destination_object)
+
+            if self.move_object:
+                hook.delete(self.source_bucket, self.source_object)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/80376795/tests/contrib/operators/test_gcs_to_gcs_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_gcs_to_gcs_operator.py 
b/tests/contrib/operators/test_gcs_to_gcs_operator.py
new file mode 100644
index 0000000..0415305
--- /dev/null
+++ b/tests/contrib/operators/test_gcs_to_gcs_operator.py
@@ -0,0 +1,80 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.contrib.operators.gcs_to_gcs import \
+    GoogleCloudStorageToGoogleCloudStorageOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+TASK_ID = 'test-gcs-to-gcs-operator'
+TEST_BUCKET = 'test-bucket'
+DELIMITER = '.csv'
+PREFIX = 'TEST'
+SOURCE_OBJECT_1 = '*test_object'
+SOURCE_OBJECT_2 = 'test_object*'
+SOURCE_OBJECT_3 = 'test*object'
+DESTINATION_BUCKET = 'archive'
+
+
+class GoogleCloudStorageToCloudStorageOperatorTest(unittest.TestCase):
+    """
+    Tests the three use-cases for the wildcard operator. These are
+    no_prefix: *test_object
+    no_suffix: test_object*
+    prefix_and_suffix: test*object
+    """
+
+    @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
+    def test_execute_no_prefix(self, mock_hook):
+        operator = GoogleCloudStorageToGoogleCloudStorageOperator(
+            task_id=TASK_ID, source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_1,
+            destination_bucket=DESTINATION_BUCKET)
+
+        operator.execute(None)
+        mock_hook.return_value.list.assert_called_once_with(
+            TEST_BUCKET, prefix="", delimiter="test_object"
+        )
+
+    @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
+    def test_execute_no_suffix(self, mock_hook):
+        operator = GoogleCloudStorageToGoogleCloudStorageOperator(
+            task_id=TASK_ID, source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_2,
+            destination_bucket=DESTINATION_BUCKET)
+
+        operator.execute(None)
+        mock_hook.return_value.list.assert_called_once_with(
+            TEST_BUCKET, prefix="test_object", delimiter=""
+        )
+
+    @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
+    def test_execute_prefix_and_suffix(self, mock_hook):
+        operator = GoogleCloudStorageToGoogleCloudStorageOperator(
+            task_id=TASK_ID, source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_3,
+            destination_bucket=DESTINATION_BUCKET)
+
+        operator.execute(None)
+        mock_hook.return_value.list.assert_called_once_with(
+            TEST_BUCKET, prefix="test", delimiter="object"
+        )

Reply via email to