This is an automated email from the ASF dual-hosted git repository.

pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4dc2c40dfe Fix GCSToGoogleDriveOperator and gdrive system tests 
(#34545)
4dc2c40dfe is described below

commit 4dc2c40dfefb3495e435aabb99c07fa6e4f32c5d
Author: max <42827971+moiseen...@users.noreply.github.com>
AuthorDate: Thu Oct 12 18:53:50 2023 +0200

    Fix GCSToGoogleDriveOperator and gdrive system tests (#34545)
---
 .../google/suite/transfers/gcs_to_gdrive.py        |  2 +-
 .../google/suite/transfers/test_gcs_to_gdrive.py   | 14 ++--
 .../google/cloud/gcs/example_gcs_to_gdrive.py      | 93 ++++++++++++++++++----
 .../google/cloud/gcs/example_gdrive_to_gcs.py      | 74 ++++++++++++++---
 .../cloud/transfers/example_gdrive_to_local.py     | 78 +++++++++++++++---
 .../google/suite/example_local_to_drive.py         | 68 +++++++++++++++-
 6 files changed, 281 insertions(+), 48 deletions(-)

diff --git a/airflow/providers/google/suite/transfers/gcs_to_gdrive.py 
b/airflow/providers/google/suite/transfers/gcs_to_gdrive.py
index 0915906b1c..99f9ccafc5 100644
--- a/airflow/providers/google/suite/transfers/gcs_to_gdrive.py
+++ b/airflow/providers/google/suite/transfers/gcs_to_gdrive.py
@@ -95,7 +95,7 @@ class GCSToGoogleDriveOperator(BaseOperator):
         source_bucket: str,
         source_object: str,
         destination_object: str | None = None,
-        destination_folder_id: str | None = None,
+        destination_folder_id: str = "root",
         move_object: bool = False,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
diff --git a/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py 
b/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py
index 47d1f6e038..8e97be5b0e 100644
--- a/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py
+++ b/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py
@@ -66,7 +66,7 @@ class TestGcsToGDriveOperator:
                 mock.call().upload_file(
                     local_location="TMP1",
                     remote_location="copied_sales/2017/january-backup.avro",
-                    folder_id=None,
+                    folder_id="root",
                 ),
             ]
         )
@@ -156,13 +156,13 @@ class TestGcsToGDriveOperator:
                     impersonation_chain=IMPERSONATION_CHAIN,
                 ),
                 mock.call().upload_file(
-                    local_location="TMP1", remote_location="sales/A.avro", 
folder_id=None
+                    local_location="TMP1", remote_location="sales/A.avro", 
folder_id="root"
                 ),
                 mock.call().upload_file(
-                    local_location="TMP2", remote_location="sales/B.avro", 
folder_id=None
+                    local_location="TMP2", remote_location="sales/B.avro", 
folder_id="root"
                 ),
                 mock.call().upload_file(
-                    local_location="TMP3", remote_location="sales/C.avro", 
folder_id=None
+                    local_location="TMP3", remote_location="sales/C.avro", 
folder_id="root"
                 ),
             ]
         )
@@ -210,13 +210,13 @@ class TestGcsToGDriveOperator:
                     impersonation_chain=IMPERSONATION_CHAIN,
                 ),
                 mock.call().upload_file(
-                    local_location="TMP1", remote_location="sales/A.avro", 
folder_id=None
+                    local_location="TMP1", remote_location="sales/A.avro", 
folder_id="root"
                 ),
                 mock.call().upload_file(
-                    local_location="TMP2", remote_location="sales/B.avro", 
folder_id=None
+                    local_location="TMP2", remote_location="sales/B.avro", 
folder_id="root"
                 ),
                 mock.call().upload_file(
-                    local_location="TMP3", remote_location="sales/C.avro", 
folder_id=None
+                    local_location="TMP3", remote_location="sales/C.avro", 
folder_id="root"
                 ),
             ]
         )
diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py 
b/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
index e9cd16d8f9..541964e82d 100644
--- a/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
+++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
@@ -23,53 +23,86 @@ https://www.googleapis.com/auth/drive
 """
 from __future__ import annotations
 
+import json
+import logging
 import os
 from datetime import datetime
 from pathlib import Path
 
+from airflow.decorators import task
+from airflow.models import Connection
 from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
 from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import 
LocalFilesystemToGCSOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
 from airflow.providers.google.suite.transfers.gcs_to_gdrive import 
GCSToGoogleDriveOperator
+from airflow.settings import Session
 from airflow.utils.trigger_rule import TriggerRule
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
-FOLDER_ID = os.environ.get("GCP_GDRIVE_FOLDER_ID", "abcd1234")
+FOLDER_ID = os.environ.get("GCP_GDRIVE_FOLDER_ID", None)
 
 DAG_ID = "example_gcs_to_gdrive"
 
 BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
 
 TMP_PATH = "tmp"
-
+WORK_DIR = f"folder_{DAG_ID}_{ENV_ID}".replace("-", "_")
 CURRENT_FOLDER = Path(__file__).parent
 LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources")
-
 FILE_LOCAL_PATH = str(Path(LOCAL_PATH))
 FILE_NAME = "example_upload.txt"
 
+log = logging.getLogger(__name__)
+
 
 with DAG(
     DAG_ID,
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "gcs"],
+    tags=["example", "gcs", "gdrive"],
 ) as dag:
+
+    @task
+    def create_temp_gcp_connection():
+        conn = Connection(
+            conn_id=CONNECTION_ID,
+            conn_type="google_cloud_platform",
+        )
+        conn_extra_json = json.dumps(
+            {
+                "scope": "https://www.googleapis.com/auth/drive,";
+                "https://www.googleapis.com/auth/cloud-platform";
+            }
+        )
+        conn.set_extra(conn_extra_json)
+
+        session: Session = Session()
+        if session.query(Connection).filter(Connection.conn_id == 
CONNECTION_ID).first():
+            log.warning("Connection %s already exists", CONNECTION_ID)
+            return None
+        session.add(conn)
+        session.commit()
+
+    create_temp_gcp_connection_task = create_temp_gcp_connection()
+
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
     )
 
-    upload_file = LocalFilesystemToGCSOperator(
-        task_id="upload_file",
+    upload_file_1 = LocalFilesystemToGCSOperator(
+        task_id="upload_file_1",
         src=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
         dst=f"{TMP_PATH}/{FILE_NAME}",
         bucket=BUCKET_NAME,
     )
 
     upload_file_2 = LocalFilesystemToGCSOperator(
-        task_id="upload_fil_2",
+        task_id="upload_file_2",
         src=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
         dst=f"{TMP_PATH}/2_{FILE_NAME}",
         bucket=BUCKET_NAME,
@@ -77,18 +110,20 @@ with DAG(
     # [START howto_operator_gcs_to_gdrive_copy_single_file]
     copy_single_file = GCSToGoogleDriveOperator(
         task_id="copy_single_file",
+        gcp_conn_id=CONNECTION_ID,
         source_bucket=BUCKET_NAME,
         source_object=f"{TMP_PATH}/{FILE_NAME}",
-        destination_object=f"copied_tmp/copied_{FILE_NAME}",
+        destination_object=f"{WORK_DIR}/copied_{FILE_NAME}",
     )
     # [END howto_operator_gcs_to_gdrive_copy_single_file]
 
     # [START howto_operator_gcs_to_gdrive_copy_single_file_into_folder]
     copy_single_file_into_folder = GCSToGoogleDriveOperator(
         task_id="copy_single_file_into_folder",
+        gcp_conn_id=CONNECTION_ID,
         source_bucket=BUCKET_NAME,
         source_object=f"{TMP_PATH}/{FILE_NAME}",
-        destination_object=f"copied_tmp/copied_{FILE_NAME}",
+        destination_object=f"{WORK_DIR}/copied_{FILE_NAME}",
         destination_folder_id=FOLDER_ID,
     )
     # [END howto_operator_gcs_to_gdrive_copy_single_file_into_folder]
@@ -96,36 +131,64 @@ with DAG(
     # [START howto_operator_gcs_to_gdrive_copy_files]
     copy_files = GCSToGoogleDriveOperator(
         task_id="copy_files",
+        gcp_conn_id=CONNECTION_ID,
         source_bucket=BUCKET_NAME,
         source_object=f"{TMP_PATH}/*",
-        destination_object="copied_tmp/",
+        destination_object=f"{WORK_DIR}/",
     )
     # [END howto_operator_gcs_to_gdrive_copy_files]
 
     # [START howto_operator_gcs_to_gdrive_move_files]
     move_files = GCSToGoogleDriveOperator(
         task_id="move_files",
+        gcp_conn_id=CONNECTION_ID,
         source_bucket=BUCKET_NAME,
         source_object=f"{TMP_PATH}/*.txt",
+        destination_object=f"{WORK_DIR}/",
         move_object=True,
     )
     # [END howto_operator_gcs_to_gdrive_move_files]
 
+    @task(trigger_rule=TriggerRule.ALL_DONE)
+    def remove_files_from_drive():
+        service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn()
+        root_path = (
+            service.files()
+            .list(q=f"name = '{WORK_DIR}' and mimeType = 
'application/vnd.google-apps.folder'")
+            .execute()
+        )
+        if files := root_path["files"]:
+            batch = service.new_batch_http_request()
+            for file in files:
+                log.info("Preparing to remove file: {}", file)
+                batch.add(service.files().delete(fileId=file["id"]))
+            batch.execute()
+            log.info("Selected files removed.")
+
+    remove_files_from_drive_task = remove_files_from_drive()
+
     delete_bucket = GCSDeleteBucketOperator(
         task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
     )
 
+    delete_temp_gcp_connection_task = BashOperator(
+        task_id="delete_temp_gcp_connection",
+        bash_command=f"airflow connections delete {CONNECTION_ID}",
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    # TEST SETUP
+    create_bucket >> [upload_file_1, upload_file_2]
     (
-        # TEST SETUP
-        create_bucket
-        >> upload_file
-        >> upload_file_2
+        [upload_file_1, upload_file_2, create_temp_gcp_connection_task]
         # TEST BODY
         >> copy_single_file
+        >> copy_single_file_into_folder
         >> copy_files
         >> move_files
         # TEST TEARDOWN
-        >> delete_bucket
+        >> remove_files_from_drive_task
+        >> [delete_bucket, delete_temp_gcp_connection_task]
     )
 
     from tests.system.utils.watcher import watcher
diff --git a/tests/system/providers/google/cloud/gcs/example_gdrive_to_gcs.py 
b/tests/system/providers/google/cloud/gcs/example_gdrive_to_gcs.py
index b3debb12dd..883cb33054 100644
--- a/tests/system/providers/google/cloud/gcs/example_gdrive_to_gcs.py
+++ b/tests/system/providers/google/cloud/gcs/example_gdrive_to_gcs.py
@@ -17,16 +17,23 @@
 # under the License.
 from __future__ import annotations
 
+import json
+import logging
 import os
 from datetime import datetime
 from pathlib import Path
 
+from airflow.decorators import task
+from airflow.models import Connection
 from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
 from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
 from airflow.providers.google.cloud.transfers.gdrive_to_gcs import 
GoogleDriveToGCSOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import 
LocalFilesystemToGCSOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
 from airflow.providers.google.suite.sensors.drive import 
GoogleDriveFileExistenceSensor
 from airflow.providers.google.suite.transfers.gcs_to_gdrive import 
GCSToGoogleDriveOperator
+from airflow.settings import Session
 from airflow.utils.trigger_rule import TriggerRule
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
@@ -35,20 +42,48 @@ PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 DAG_ID = "example_gdrive_to_gcs_with_gdrive_sensor"
 
 BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
 
 OBJECT = "abc123xyz"
 FOLDER_ID = ""
 FILE_NAME = "example_upload.txt"
+DRIVE_FILE_NAME = f"example_upload_{DAG_ID}_{ENV_ID}.txt"
 LOCAL_PATH = str(Path(__file__).parent / "resources" / FILE_NAME)
 
+log = logging.getLogger(__name__)
+
+
 with DAG(
     DAG_ID,
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example"],
+    tags=["example", "gcs", "gdrive"],
 ) as dag:
 
+    @task
+    def create_temp_gcp_connection():
+        conn = Connection(
+            conn_id=CONNECTION_ID,
+            conn_type="google_cloud_platform",
+        )
+        conn_extra_json = json.dumps(
+            {
+                "scope": "https://www.googleapis.com/auth/drive,";
+                "https://www.googleapis.com/auth/cloud-platform";
+            }
+        )
+        conn.set_extra(conn_extra_json)
+
+        session: Session = Session()
+        if session.query(Connection).filter(Connection.conn_id == 
CONNECTION_ID).first():
+            log.warning("Connection %s already exists", CONNECTION_ID)
+            return None
+        session.add(conn)
+        session.commit()
+
+    create_temp_gcp_connection_task = create_temp_gcp_connection()
+
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
     )
@@ -62,41 +97,62 @@ with DAG(
 
     copy_single_file = GCSToGoogleDriveOperator(
         task_id="copy_single_file",
+        gcp_conn_id=CONNECTION_ID,
         source_bucket=BUCKET_NAME,
         source_object=FILE_NAME,
-        destination_object=FILE_NAME,
+        destination_object=DRIVE_FILE_NAME,
     )
 
     # [START detect_file]
     detect_file = GoogleDriveFileExistenceSensor(
-        task_id="detect_file", folder_id=FOLDER_ID, file_name=FILE_NAME
+        task_id="detect_file",
+        folder_id=FOLDER_ID,
+        file_name=DRIVE_FILE_NAME,
+        gcp_conn_id=CONNECTION_ID,
     )
     # [END detect_file]
 
     # [START upload_gdrive_to_gcs]
     upload_gdrive_to_gcs = GoogleDriveToGCSOperator(
         task_id="upload_gdrive_object_to_gcs",
+        gcp_conn_id=CONNECTION_ID,
         folder_id=FOLDER_ID,
-        file_name=FILE_NAME,
+        file_name=DRIVE_FILE_NAME,
         bucket_name=BUCKET_NAME,
         object_name=OBJECT,
     )
     # [END upload_gdrive_to_gcs]
 
+    @task(trigger_rule=TriggerRule.ALL_DONE)
+    def remove_files_from_drive():
+        service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn()
+        response = service.files().list(q=f"name = 
'{DRIVE_FILE_NAME}'").execute()
+        if files := response["files"]:
+            file = files[0]
+            log.info("Deleting file {}...", file)
+            service.files().delete(fileId=file["id"])
+            log.info("Done.")
+
+    remove_files_from_drive_task = remove_files_from_drive()
+
     delete_bucket = GCSDeleteBucketOperator(
         task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
     )
 
+    delete_temp_gcp_connection_task = BashOperator(
+        task_id="delete_temp_gcp_connection",
+        bash_command=f"airflow connections delete {CONNECTION_ID}",
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
     (
-        # TEST SETUP
-        create_bucket
-        >> upload_file
-        >> copy_single_file
+        [create_bucket >> upload_file >> copy_single_file, 
create_temp_gcp_connection_task]
         # TEST BODY
         >> detect_file
         >> upload_gdrive_to_gcs
         # TEST TEARDOWN
-        >> delete_bucket
+        >> remove_files_from_drive_task
+        >> [delete_bucket, delete_temp_gcp_connection_task]
     )
 
     from tests.system.utils.watcher import watcher
diff --git 
a/tests/system/providers/google/cloud/transfers/example_gdrive_to_local.py 
b/tests/system/providers/google/cloud/transfers/example_gdrive_to_local.py
index a7a0b6ac3d..a989af9cfd 100644
--- a/tests/system/providers/google/cloud/transfers/example_gdrive_to_local.py
+++ b/tests/system/providers/google/cloud/transfers/example_gdrive_to_local.py
@@ -23,38 +23,70 @@ https://www.googleapis.com/auth/drive
 """
 from __future__ import annotations
 
+import logging
 import os
 from datetime import datetime
 from pathlib import Path
 
-from airflow.models.baseoperator import chain
+from airflow.decorators import task
+from airflow.models import Connection
 from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
 from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
 from airflow.providers.google.cloud.transfers.gdrive_to_local import 
GoogleDriveToLocalOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import 
LocalFilesystemToGCSOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
 from airflow.providers.google.suite.sensors.drive import 
GoogleDriveFileExistenceSensor
 from airflow.providers.google.suite.transfers.gcs_to_gdrive import 
GCSToGoogleDriveOperator
+from airflow.settings import Session, json
 from airflow.utils.trigger_rule import TriggerRule
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = "example_gdrive_to_local"
 
+CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
 FILE_NAME = "empty.txt"
+DRIVE_FILE_NAME = f"empty_{DAG_ID}_{ENV_ID}.txt".replace("-", "_")
 OUTPUT_FILE = "out_file.txt"
-DAG_ID = "example_gdrive_to_local"
 BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
 
 LOCAL_PATH = str(Path(__file__).parent / "resources" / FILE_NAME)
 
+log = logging.getLogger(__name__)
+
 
 with DAG(
     DAG_ID,
     start_date=datetime(2021, 1, 1),
     schedule="@once",
     catchup=False,
-    tags=["example"],
+    tags=["example", "gdrive"],
 ) as dag:
 
+    @task
+    def create_temp_gcp_connection():
+        conn = Connection(
+            conn_id=CONNECTION_ID,
+            conn_type="google_cloud_platform",
+        )
+        conn_extra_json = json.dumps(
+            {
+                "scope": "https://www.googleapis.com/auth/drive,";
+                "https://www.googleapis.com/auth/cloud-platform";
+            }
+        )
+        conn.set_extra(conn_extra_json)
+
+        session: Session = Session()
+        if session.query(Connection).filter(Connection.conn_id == 
CONNECTION_ID).first():
+            log.warning("Connection %s already exists", CONNECTION_ID)
+            return None
+        session.add(conn)
+        session.commit()
+
+    create_temp_gcp_connection_task = create_temp_gcp_connection()
+
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
     )
@@ -68,42 +100,62 @@ with DAG(
 
     copy_single_file = GCSToGoogleDriveOperator(
         task_id="copy_single_file",
+        gcp_conn_id=CONNECTION_ID,
         source_bucket=BUCKET_NAME,
         source_object=FILE_NAME,
-        destination_object=FILE_NAME,
+        destination_object=DRIVE_FILE_NAME,
     )
 
     # [START detect_file]
     detect_file = GoogleDriveFileExistenceSensor(
         task_id="detect_file",
+        gcp_conn_id=CONNECTION_ID,
         folder_id="",
-        file_name=FILE_NAME,
+        file_name=DRIVE_FILE_NAME,
     )
     # [END detect_file]
 
     # [START download_from_gdrive_to_local]
     download_from_gdrive_to_local = GoogleDriveToLocalOperator(
         task_id="download_from_gdrive_to_local",
+        gcp_conn_id=CONNECTION_ID,
         folder_id="",
-        file_name=FILE_NAME,
+        file_name=DRIVE_FILE_NAME,
         output_file=OUTPUT_FILE,
     )
     # [END download_from_gdrive_to_local]
 
+    @task(trigger_rule=TriggerRule.ALL_DONE)
+    def remove_file_from_drive():
+        service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn()
+        response = service.files().list(q=f"name = 
'{DRIVE_FILE_NAME}'").execute()
+        if files := response["files"]:
+            file = files[0]
+            log.info("Deleting file {}...", file)
+            service.files().delete(fileId=file["id"])
+            log.info("Done.")
+
+    remove_file_from_drive_task = remove_file_from_drive()
+
     delete_bucket = GCSDeleteBucketOperator(
         task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
     )
 
-    chain(
+    delete_temp_gcp_connection_task = BashOperator(
+        task_id="delete_temp_gcp_connection",
+        bash_command=f"airflow connections delete {CONNECTION_ID}",
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    (
         # TEST SETUP
-        create_bucket,
-        upload_file,
-        copy_single_file,
+        [create_bucket >> upload_file >> copy_single_file, 
create_temp_gcp_connection_task]
         # TEST BODY
-        detect_file,
-        download_from_gdrive_to_local,
+        >> detect_file
+        >> download_from_gdrive_to_local
         # TEST TEARDOWN
-        delete_bucket,
+        >> remove_file_from_drive_task
+        >> [delete_bucket, delete_temp_gcp_connection_task]
     )
 
     from tests.system.utils.watcher import watcher
diff --git a/tests/system/providers/google/suite/example_local_to_drive.py 
b/tests/system/providers/google/suite/example_local_to_drive.py
index 7e5130f68b..2738fd5607 100644
--- a/tests/system/providers/google/suite/example_local_to_drive.py
+++ b/tests/system/providers/google/suite/example_local_to_drive.py
@@ -23,23 +23,36 @@ https://www.googleapis.com/auth/drive
 """
 from __future__ import annotations
 
+import json
+import logging
+import os
 from datetime import datetime
 from pathlib import Path
 
+from airflow.decorators import task
+from airflow.models import Connection
 from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
 from airflow.providers.google.suite.transfers.local_to_drive import 
LocalFilesystemToGoogleDriveOperator
+from airflow.settings import Session
+from airflow.utils.trigger_rule import TriggerRule
 
 DAG_ID = "example_local_to_drive"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 
 FILE_NAME_1 = "test1"
 FILE_NAME_2 = "test2"
+CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
 
 LOCAL_PATH = str(Path(__file__).parent / "resources")
 
 SINGLE_FILE_LOCAL_PATHS = [str(Path(LOCAL_PATH) / FILE_NAME_1)]
 MULTIPLE_FILES_LOCAL_PATHS = [str(Path(LOCAL_PATH) / FILE_NAME_1), 
str(Path(LOCAL_PATH) / FILE_NAME_2)]
 
-DRIVE_FOLDER = "test-folder"
+DRIVE_FOLDER = f"test_folder_{DAG_ID}_{ENV_ID}"
+
+log = logging.getLogger(__name__)
 
 
 with DAG(
@@ -47,11 +60,30 @@ with DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example"],
+    tags=["example", "gdrive"],
 ) as dag:
 
+    @task
+    def create_temp_gcp_connection():
+        conn = Connection(
+            conn_id=CONNECTION_ID,
+            conn_type="google_cloud_platform",
+        )
+        conn_extra_json = json.dumps({"scope": 
"https://www.googleapis.com/auth/drive"})
+        conn.set_extra(conn_extra_json)
+
+        session: Session = Session()
+        if session.query(Connection).filter(Connection.conn_id == 
CONNECTION_ID).first():
+            log.warning("Connection %s already exists", CONNECTION_ID)
+            return None
+        session.add(conn)
+        session.commit()
+
+    create_temp_gcp_connection_task = create_temp_gcp_connection()
+
     # [START howto_operator_local_to_drive_upload_single_file]
     upload_single_file = LocalFilesystemToGoogleDriveOperator(
+        gcp_conn_id=CONNECTION_ID,
         task_id="upload_single_file",
         local_paths=SINGLE_FILE_LOCAL_PATHS,
         drive_folder=DRIVE_FOLDER,
@@ -60,6 +92,7 @@ with DAG(
 
     # [START howto_operator_local_to_drive_upload_multiple_files]
     upload_multiple_files = LocalFilesystemToGoogleDriveOperator(
+        gcp_conn_id=CONNECTION_ID,
         task_id="upload_multiple_files",
         local_paths=MULTIPLE_FILES_LOCAL_PATHS,
         drive_folder=DRIVE_FOLDER,
@@ -67,10 +100,39 @@ with DAG(
     )
     # [END howto_operator_local_to_drive_upload_multiple_files]
 
+    @task(trigger_rule=TriggerRule.ALL_DONE)
+    def remove_files_from_drive():
+        service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn()
+        root_path = (
+            service.files()
+            .list(q=f"name = '{DRIVE_FOLDER}' and mimeType = 
'application/vnd.google-apps.folder'")
+            .execute()
+        )
+        if files := root_path["files"]:
+            batch = service.new_batch_http_request()
+            for file in files:
+                log.info("Preparing to remove file: {}", file)
+                batch.add(service.files().delete(fileId=file["id"]))
+            batch.execute()
+            log.info("Selected files removed.")
+
+    remove_files_from_drive_task = remove_files_from_drive()
+
+    delete_temp_gcp_connection_task = BashOperator(
+        task_id="delete_temp_gcp_connection",
+        bash_command=f"airflow connections delete {CONNECTION_ID}",
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
     (
+        # TEST SETUP
+        create_temp_gcp_connection_task
         # TEST BODY
-        upload_single_file
+        >> upload_single_file
         >> upload_multiple_files
+        # TEST TEARDOWN
+        >> remove_files_from_drive_task
+        >> delete_temp_gcp_connection_task
     )
 
     from tests.system.utils.watcher import watcher

Reply via email to