This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3a8da4b55b Allow a destination folder to be provided (#31885)
3a8da4b55b is described below
commit 3a8da4b55b363b5b74d78b7d481dc7544afd36cb
Author: Pat Leamon <[email protected]>
AuthorDate: Tue Jul 11 19:00:03 2023 +1000
Allow a destination folder to be provided (#31885)
* Allow a destination folder to be provided
With the move from `delegate_to` to `impersonation_chain` I'm no longer
able to impersonate a user, only a service account. Google drive files created
by a service account aren't visible to anyone else by default so having them
placed under an already shared folder makes it easier for people to see them.
* delegate_to has been removed
* Add an example of how this works with other parameters
* Add example usage for supplying destination folder ID
---
.../google/suite/transfers/gcs_to_gdrive.py | 17 ++++--
.../operators/transfer/gcs_to_gdrive.rst | 11 ++++
.../google/suite/transfers/test_gcs_to_gdrive.py | 71 +++++++++++++++++++---
.../google/cloud/gcs/example_gcs_to_gdrive.py | 11 ++++
4 files changed, 99 insertions(+), 11 deletions(-)
diff --git a/airflow/providers/google/suite/transfers/gcs_to_gdrive.py
b/airflow/providers/google/suite/transfers/gcs_to_gdrive.py
index 9220b6b3b5..0915906b1c 100644
--- a/airflow/providers/google/suite/transfers/gcs_to_gdrive.py
+++ b/airflow/providers/google/suite/transfers/gcs_to_gdrive.py
@@ -62,12 +62,15 @@ class GCSToGoogleDriveOperator(BaseOperator):
For example, with prefix ``foo/*`` and destination_object ``blah/``,
the file ``foo/baz`` will be
copied to ``blah/baz``; to retain the prefix write the
destination_object as e.g. ``blah/foo``, in
which case the copied file will be named ``blah/foo/baz``.
+ :param destination_folder_id: The folder ID where the destination objects
will be placed. It is
+ an additive prefix for anything specified in destination_object.
+ For example if folder ID ``xXyYzZ`` is called ``foo`` and the
destination is ``bar/baz``, the file
+ will end up in `foo/bar/baz`.
+ This can be used to target an existing folder that is already visible
to other users. The credentials
+ provided must have access to this folder.
:param move_object: When move object is True, the object is moved instead
of copied to the new location.
This is the equivalent of a mv command as opposed to a cp command.
:param gcp_conn_id: (Optional) The connection ID used to connect to Google
Cloud.
- :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
- if any. For this to work, the service account making the request must
have
- domain-wide delegation enabled.
:param impersonation_chain: Optional service account to impersonate using
short-term
credentials, or chained list of accounts required to get the
access_token
of the last account in the list, which will be impersonated in the
request.
@@ -92,6 +95,7 @@ class GCSToGoogleDriveOperator(BaseOperator):
source_bucket: str,
source_object: str,
destination_object: str | None = None,
+ destination_folder_id: str | None = None,
move_object: bool = False,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
@@ -102,6 +106,7 @@ class GCSToGoogleDriveOperator(BaseOperator):
self.source_bucket = source_bucket
self.source_object = source_object
self.destination_object = destination_object
+ self.destination_folder_id = destination_folder_id
self.move_object = move_object
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
@@ -161,7 +166,11 @@ class GCSToGoogleDriveOperator(BaseOperator):
self.gcs_hook.download(
bucket_name=self.source_bucket, object_name=source_object,
filename=filename
)
- self.gdrive_hook.upload_file(local_location=filename,
remote_location=destination_object)
+ self.gdrive_hook.upload_file(
+ local_location=filename,
+ remote_location=destination_object,
+ folder_id=self.destination_folder_id,
+ )
if self.move_object:
self.gcs_hook.delete(self.source_bucket, source_object)
diff --git
a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gdrive.rst
b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gdrive.rst
index 9ed1cd58a8..0d59a22915 100644
--- a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gdrive.rst
+++ b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gdrive.rst
@@ -53,6 +53,17 @@ The following Operator would copy a single file.
:start-after: [START howto_operator_gcs_to_gdrive_copy_single_file]
:end-before: [END howto_operator_gcs_to_gdrive_copy_single_file]
+Copy into an existing folder
+----------------------------
+
+The following Operator would copy a single file into an existing folder with
the specified ID.
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
+ :language: python
+ :dedent: 4
+ :start-after: [START
howto_operator_gcs_to_gdrive_copy_single_file_into_folder]
+ :end-before: [END
howto_operator_gcs_to_gdrive_copy_single_file_into_folder]
+
Copy multiple files
-------------------
diff --git a/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py
b/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py
index 525f20398a..47d1f6e038 100644
--- a/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py
+++ b/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py
@@ -64,7 +64,52 @@ class TestGcsToGDriveOperator:
impersonation_chain=None,
),
mock.call().upload_file(
- local_location="TMP1",
remote_location="copied_sales/2017/january-backup.avro"
+ local_location="TMP1",
+ remote_location="copied_sales/2017/january-backup.avro",
+ folder_id=None,
+ ),
+ ]
+ )
+
+ @mock.patch(MODULE + ".GCSHook")
+ @mock.patch(MODULE + ".GoogleDriveHook")
+ @mock.patch(MODULE + ".tempfile.NamedTemporaryFile")
+ def test_should_copy_single_file_with_folder(self,
mock_named_temporary_file, mock_gdrive, mock_gcs_hook):
+
type(mock_named_temporary_file.return_value.__enter__.return_value).name =
mock.PropertyMock(
+ side_effect=["TMP1"]
+ )
+ task = GCSToGoogleDriveOperator(
+ task_id="copy_single_file",
+ source_bucket="data",
+ source_object="sales/sales-2017/january.avro",
+ destination_object="copied_sales/2017/january-backup.avro",
+ destination_folder_id="aAopls6bE4tUllZVGJvRUU",
+ )
+
+ task.execute(mock.MagicMock())
+
+ mock_gcs_hook.assert_has_calls(
+ [
+ mock.call(
+ gcp_conn_id="google_cloud_default",
+ impersonation_chain=None,
+ ),
+ mock.call().download(
+ bucket_name="data", filename="TMP1",
object_name="sales/sales-2017/january.avro"
+ ),
+ ]
+ )
+
+ mock_gdrive.assert_has_calls(
+ [
+ mock.call(
+ gcp_conn_id="google_cloud_default",
+ impersonation_chain=None,
+ ),
+ mock.call().upload_file(
+ local_location="TMP1",
+ remote_location="copied_sales/2017/january-backup.avro",
+ folder_id="aAopls6bE4tUllZVGJvRUU",
),
]
)
@@ -110,9 +155,15 @@ class TestGcsToGDriveOperator:
gcp_conn_id="google_cloud_default",
impersonation_chain=IMPERSONATION_CHAIN,
),
- mock.call().upload_file(local_location="TMP1",
remote_location="sales/A.avro"),
- mock.call().upload_file(local_location="TMP2",
remote_location="sales/B.avro"),
- mock.call().upload_file(local_location="TMP3",
remote_location="sales/C.avro"),
+ mock.call().upload_file(
+ local_location="TMP1", remote_location="sales/A.avro",
folder_id=None
+ ),
+ mock.call().upload_file(
+ local_location="TMP2", remote_location="sales/B.avro",
folder_id=None
+ ),
+ mock.call().upload_file(
+ local_location="TMP3", remote_location="sales/C.avro",
folder_id=None
+ ),
]
)
@@ -158,9 +209,15 @@ class TestGcsToGDriveOperator:
gcp_conn_id="google_cloud_default",
impersonation_chain=IMPERSONATION_CHAIN,
),
- mock.call().upload_file(local_location="TMP1",
remote_location="sales/A.avro"),
- mock.call().upload_file(local_location="TMP2",
remote_location="sales/B.avro"),
- mock.call().upload_file(local_location="TMP3",
remote_location="sales/C.avro"),
+ mock.call().upload_file(
+ local_location="TMP1", remote_location="sales/A.avro",
folder_id=None
+ ),
+ mock.call().upload_file(
+ local_location="TMP2", remote_location="sales/B.avro",
folder_id=None
+ ),
+ mock.call().upload_file(
+ local_location="TMP3", remote_location="sales/C.avro",
folder_id=None
+ ),
]
)
diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
b/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
index e9b1c3a041..c3371a2c3c 100644
--- a/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
+++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
@@ -35,6 +35,7 @@ from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+FOLDER_ID = os.environ.get("GCP_GDRIVE_FOLDER_ID", "abcd1234")
DAG_ID = "example_gcs_to_gdrive"
@@ -82,6 +83,16 @@ with models.DAG(
)
# [END howto_operator_gcs_to_gdrive_copy_single_file]
+ # [START howto_operator_gcs_to_gdrive_copy_single_file_into_folder]
+ copy_single_file_into_folder = GCSToGoogleDriveOperator(
+ task_id="copy_single_file_into_folder",
+ source_bucket=BUCKET_NAME,
+ source_object=f"{TMP_PATH}/{FILE_NAME}",
+ destination_object=f"copied_tmp/copied_{FILE_NAME}",
+ destination_folder_id=FOLDER_ID,
+ )
+ # [END howto_operator_gcs_to_gdrive_copy_single_file_into_folder]
+
# [START howto_operator_gcs_to_gdrive_copy_files]
copy_files = GCSToGoogleDriveOperator(
task_id="copy_files",