This is an automated email from the ASF dual-hosted git repository. pankaj pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 4dc2c40dfe Fix GCSToGoogleDriveOperator and gdrive system tests (#34545) 4dc2c40dfe is described below commit 4dc2c40dfefb3495e435aabb99c07fa6e4f32c5d Author: max <42827971+moiseen...@users.noreply.github.com> AuthorDate: Thu Oct 12 18:53:50 2023 +0200 Fix GCSToGoogleDriveOperator and gdrive system tests (#34545) --- .../google/suite/transfers/gcs_to_gdrive.py | 2 +- .../google/suite/transfers/test_gcs_to_gdrive.py | 14 ++-- .../google/cloud/gcs/example_gcs_to_gdrive.py | 93 ++++++++++++++++++---- .../google/cloud/gcs/example_gdrive_to_gcs.py | 74 ++++++++++++++--- .../cloud/transfers/example_gdrive_to_local.py | 78 +++++++++++++++--- .../google/suite/example_local_to_drive.py | 68 +++++++++++++++- 6 files changed, 281 insertions(+), 48 deletions(-) diff --git a/airflow/providers/google/suite/transfers/gcs_to_gdrive.py b/airflow/providers/google/suite/transfers/gcs_to_gdrive.py index 0915906b1c..99f9ccafc5 100644 --- a/airflow/providers/google/suite/transfers/gcs_to_gdrive.py +++ b/airflow/providers/google/suite/transfers/gcs_to_gdrive.py @@ -95,7 +95,7 @@ class GCSToGoogleDriveOperator(BaseOperator): source_bucket: str, source_object: str, destination_object: str | None = None, - destination_folder_id: str | None = None, + destination_folder_id: str = "root", move_object: bool = False, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, diff --git a/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py b/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py index 47d1f6e038..8e97be5b0e 100644 --- a/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py +++ b/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py @@ -66,7 +66,7 @@ class TestGcsToGDriveOperator: mock.call().upload_file( local_location="TMP1", remote_location="copied_sales/2017/january-backup.avro", - folder_id=None, + folder_id="root", ), ] ) @@ -156,13 +156,13 @@ class TestGcsToGDriveOperator: impersonation_chain=IMPERSONATION_CHAIN, ), mock.call().upload_file( - local_location="TMP1", remote_location="sales/A.avro", folder_id=None + local_location="TMP1", remote_location="sales/A.avro", folder_id="root" ), mock.call().upload_file( - local_location="TMP2", remote_location="sales/B.avro", folder_id=None + local_location="TMP2", remote_location="sales/B.avro", folder_id="root" ), mock.call().upload_file( - local_location="TMP3", remote_location="sales/C.avro", folder_id=None + local_location="TMP3", remote_location="sales/C.avro", folder_id="root" ), ] ) @@ -210,13 +210,13 @@ class TestGcsToGDriveOperator: impersonation_chain=IMPERSONATION_CHAIN, ), mock.call().upload_file( - local_location="TMP1", remote_location="sales/A.avro", folder_id=None + local_location="TMP1", remote_location="sales/A.avro", folder_id="root" ), mock.call().upload_file( - local_location="TMP2", remote_location="sales/B.avro", folder_id=None + local_location="TMP2", remote_location="sales/B.avro", folder_id="root" ), mock.call().upload_file( - local_location="TMP3", remote_location="sales/C.avro", folder_id=None + local_location="TMP3", remote_location="sales/C.avro", folder_id="root" ), ] ) diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py b/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py index e9cd16d8f9..541964e82d 100644 --- a/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py +++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py @@ -23,53 +23,86 @@ https://www.googleapis.com/auth/drive """ from __future__ import annotations +import json +import logging import os from datetime import datetime from pathlib import Path +from airflow.decorators import task +from airflow.models import Connection from airflow.models.dag import DAG +from airflow.operators.bash import BashOperator from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.providers.google.suite.hooks.drive import GoogleDriveHook from airflow.providers.google.suite.transfers.gcs_to_gdrive import GCSToGoogleDriveOperator +from airflow.settings import Session from airflow.utils.trigger_rule import TriggerRule ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") -FOLDER_ID = os.environ.get("GCP_GDRIVE_FOLDER_ID", "abcd1234") +FOLDER_ID = os.environ.get("GCP_GDRIVE_FOLDER_ID", None) DAG_ID = "example_gcs_to_gdrive" BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" +CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}" TMP_PATH = "tmp" - +WORK_DIR = f"folder_{DAG_ID}_{ENV_ID}".replace("-", "_") CURRENT_FOLDER = Path(__file__).parent LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources") - FILE_LOCAL_PATH = str(Path(LOCAL_PATH)) FILE_NAME = "example_upload.txt" +log = logging.getLogger(__name__) + with DAG( DAG_ID, schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, - tags=["example", "gcs"], + tags=["example", "gcs", "gdrive"], ) as dag: + + @task + def create_temp_gcp_connection(): + conn = Connection( + conn_id=CONNECTION_ID, + conn_type="google_cloud_platform", + ) + conn_extra_json = json.dumps( + { + "scope": "https://www.googleapis.com/auth/drive," + "https://www.googleapis.com/auth/cloud-platform" + } + ) + conn.set_extra(conn_extra_json) + + session: Session = Session() + if session.query(Connection).filter(Connection.conn_id == CONNECTION_ID).first(): + log.warning("Connection %s already exists", CONNECTION_ID) + return None + session.add(conn) + session.commit() + + create_temp_gcp_connection_task = create_temp_gcp_connection() + create_bucket = GCSCreateBucketOperator( task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID ) - upload_file = LocalFilesystemToGCSOperator( - task_id="upload_file", + upload_file_1 = LocalFilesystemToGCSOperator( + task_id="upload_file_1", src=f"{FILE_LOCAL_PATH}/{FILE_NAME}", dst=f"{TMP_PATH}/{FILE_NAME}", bucket=BUCKET_NAME, ) upload_file_2 = LocalFilesystemToGCSOperator( - task_id="upload_fil_2", + task_id="upload_file_2", src=f"{FILE_LOCAL_PATH}/{FILE_NAME}", dst=f"{TMP_PATH}/2_{FILE_NAME}", bucket=BUCKET_NAME, @@ -77,18 +110,20 @@ with DAG( # [START howto_operator_gcs_to_gdrive_copy_single_file] copy_single_file = GCSToGoogleDriveOperator( task_id="copy_single_file", + gcp_conn_id=CONNECTION_ID, source_bucket=BUCKET_NAME, source_object=f"{TMP_PATH}/{FILE_NAME}", - destination_object=f"copied_tmp/copied_{FILE_NAME}", + destination_object=f"{WORK_DIR}/copied_{FILE_NAME}", ) # [END howto_operator_gcs_to_gdrive_copy_single_file] # [START howto_operator_gcs_to_gdrive_copy_single_file_into_folder] copy_single_file_into_folder = GCSToGoogleDriveOperator( task_id="copy_single_file_into_folder", + gcp_conn_id=CONNECTION_ID, source_bucket=BUCKET_NAME, source_object=f"{TMP_PATH}/{FILE_NAME}", - destination_object=f"copied_tmp/copied_{FILE_NAME}", + destination_object=f"{WORK_DIR}/copied_{FILE_NAME}", destination_folder_id=FOLDER_ID, ) # [END howto_operator_gcs_to_gdrive_copy_single_file_into_folder] @@ -96,36 +131,64 @@ with DAG( # [START howto_operator_gcs_to_gdrive_copy_files] copy_files = GCSToGoogleDriveOperator( task_id="copy_files", + gcp_conn_id=CONNECTION_ID, source_bucket=BUCKET_NAME, source_object=f"{TMP_PATH}/*", - destination_object="copied_tmp/", + destination_object=f"{WORK_DIR}/", ) # [END howto_operator_gcs_to_gdrive_copy_files] # [START howto_operator_gcs_to_gdrive_move_files] move_files = GCSToGoogleDriveOperator( task_id="move_files", + gcp_conn_id=CONNECTION_ID, source_bucket=BUCKET_NAME, source_object=f"{TMP_PATH}/*.txt", + destination_object=f"{WORK_DIR}/", move_object=True, ) # [END howto_operator_gcs_to_gdrive_move_files] + @task(trigger_rule=TriggerRule.ALL_DONE) + def remove_files_from_drive(): + service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn() + root_path = ( + service.files() + .list(q=f"name = '{WORK_DIR}' and mimeType = 'application/vnd.google-apps.folder'") + .execute() + ) + if files := root_path["files"]: + batch = service.new_batch_http_request() + for file in files: + log.info("Preparing to remove file: {}", file) + batch.add(service.files().delete(fileId=file["id"])) + batch.execute() + log.info("Selected files removed.") + + remove_files_from_drive_task = remove_files_from_drive() + delete_bucket = GCSDeleteBucketOperator( task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE ) + delete_temp_gcp_connection_task = BashOperator( + task_id="delete_temp_gcp_connection", + bash_command=f"airflow connections delete {CONNECTION_ID}", + trigger_rule=TriggerRule.ALL_DONE, + ) + + # TEST SETUP + create_bucket >> [upload_file_1, upload_file_2] ( - # TEST SETUP - create_bucket - >> upload_file - >> upload_file_2 + [upload_file_1, upload_file_2, create_temp_gcp_connection_task] # TEST BODY >> copy_single_file + >> copy_single_file_into_folder >> copy_files >> move_files # TEST TEARDOWN - >> delete_bucket + >> remove_files_from_drive_task + >> [delete_bucket, delete_temp_gcp_connection_task] ) from tests.system.utils.watcher import watcher diff --git a/tests/system/providers/google/cloud/gcs/example_gdrive_to_gcs.py b/tests/system/providers/google/cloud/gcs/example_gdrive_to_gcs.py index b3debb12dd..883cb33054 100644 --- a/tests/system/providers/google/cloud/gcs/example_gdrive_to_gcs.py +++ b/tests/system/providers/google/cloud/gcs/example_gdrive_to_gcs.py @@ -17,16 +17,23 @@ # under the License. from __future__ import annotations +import json +import logging import os from datetime import datetime from pathlib import Path +from airflow.decorators import task +from airflow.models import Connection from airflow.models.dag import DAG +from airflow.operators.bash import BashOperator from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator from airflow.providers.google.cloud.transfers.gdrive_to_gcs import GoogleDriveToGCSOperator from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.providers.google.suite.hooks.drive import GoogleDriveHook from airflow.providers.google.suite.sensors.drive import GoogleDriveFileExistenceSensor from airflow.providers.google.suite.transfers.gcs_to_gdrive import GCSToGoogleDriveOperator +from airflow.settings import Session from airflow.utils.trigger_rule import TriggerRule ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") @@ -35,20 +42,48 @@ PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") DAG_ID = "example_gdrive_to_gcs_with_gdrive_sensor" BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" +CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}" OBJECT = "abc123xyz" FOLDER_ID = "" FILE_NAME = "example_upload.txt" +DRIVE_FILE_NAME = f"example_upload_{DAG_ID}_{ENV_ID}.txt" LOCAL_PATH = str(Path(__file__).parent / "resources" / FILE_NAME) +log = logging.getLogger(__name__) + + with DAG( DAG_ID, schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, - tags=["example"], + tags=["example", "gcs", "gdrive"], ) as dag: + @task + def create_temp_gcp_connection(): + conn = Connection( + conn_id=CONNECTION_ID, + conn_type="google_cloud_platform", + ) + conn_extra_json = json.dumps( + { + "scope": "https://www.googleapis.com/auth/drive," + "https://www.googleapis.com/auth/cloud-platform" + } + ) + conn.set_extra(conn_extra_json) + + session: Session = Session() + if session.query(Connection).filter(Connection.conn_id == CONNECTION_ID).first(): + log.warning("Connection %s already exists", CONNECTION_ID) + return None + session.add(conn) + session.commit() + + create_temp_gcp_connection_task = create_temp_gcp_connection() + create_bucket = GCSCreateBucketOperator( task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID ) @@ -62,41 +97,62 @@ with DAG( copy_single_file = GCSToGoogleDriveOperator( task_id="copy_single_file", + gcp_conn_id=CONNECTION_ID, source_bucket=BUCKET_NAME, source_object=FILE_NAME, - destination_object=FILE_NAME, + destination_object=DRIVE_FILE_NAME, ) # [START detect_file] detect_file = GoogleDriveFileExistenceSensor( - task_id="detect_file", folder_id=FOLDER_ID, file_name=FILE_NAME + task_id="detect_file", + folder_id=FOLDER_ID, + file_name=DRIVE_FILE_NAME, + gcp_conn_id=CONNECTION_ID, ) # [END detect_file] # [START upload_gdrive_to_gcs] upload_gdrive_to_gcs = GoogleDriveToGCSOperator( task_id="upload_gdrive_object_to_gcs", + gcp_conn_id=CONNECTION_ID, folder_id=FOLDER_ID, - file_name=FILE_NAME, + file_name=DRIVE_FILE_NAME, bucket_name=BUCKET_NAME, object_name=OBJECT, ) # [END upload_gdrive_to_gcs] + @task(trigger_rule=TriggerRule.ALL_DONE) + def remove_files_from_drive(): + service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn() + response = service.files().list(q=f"name = '{DRIVE_FILE_NAME}'").execute() + if files := response["files"]: + file = files[0] + log.info("Deleting file {}...", file) + service.files().delete(fileId=file["id"]) + log.info("Done.") + + remove_files_from_drive_task = remove_files_from_drive() + delete_bucket = GCSDeleteBucketOperator( task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE ) + delete_temp_gcp_connection_task = BashOperator( + task_id="delete_temp_gcp_connection", + bash_command=f"airflow connections delete {CONNECTION_ID}", + trigger_rule=TriggerRule.ALL_DONE, + ) + ( - # TEST SETUP - create_bucket - >> upload_file - >> copy_single_file + [create_bucket >> upload_file >> copy_single_file, create_temp_gcp_connection_task] # TEST BODY >> detect_file >> upload_gdrive_to_gcs # TEST TEARDOWN - >> delete_bucket + >> remove_files_from_drive_task + >> [delete_bucket, delete_temp_gcp_connection_task] ) from tests.system.utils.watcher import watcher diff --git a/tests/system/providers/google/cloud/transfers/example_gdrive_to_local.py b/tests/system/providers/google/cloud/transfers/example_gdrive_to_local.py index a7a0b6ac3d..a989af9cfd 100644 --- a/tests/system/providers/google/cloud/transfers/example_gdrive_to_local.py +++ b/tests/system/providers/google/cloud/transfers/example_gdrive_to_local.py @@ -23,38 +23,70 @@ https://www.googleapis.com/auth/drive """ from __future__ import annotations +import logging import os from datetime import datetime from pathlib import Path -from airflow.models.baseoperator import chain +from airflow.decorators import task +from airflow.models import Connection from airflow.models.dag import DAG +from airflow.operators.bash import BashOperator from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator from airflow.providers.google.cloud.transfers.gdrive_to_local import GoogleDriveToLocalOperator from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.providers.google.suite.hooks.drive import GoogleDriveHook from airflow.providers.google.suite.sensors.drive import GoogleDriveFileExistenceSensor from airflow.providers.google.suite.transfers.gcs_to_gdrive import GCSToGoogleDriveOperator +from airflow.settings import Session, json from airflow.utils.trigger_rule import TriggerRule ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") +DAG_ID = "example_gdrive_to_local" +CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}" FILE_NAME = "empty.txt" +DRIVE_FILE_NAME = f"empty_{DAG_ID}_{ENV_ID}.txt".replace("-", "_") OUTPUT_FILE = "out_file.txt" -DAG_ID = "example_gdrive_to_local" BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" LOCAL_PATH = str(Path(__file__).parent / "resources" / FILE_NAME) +log = logging.getLogger(__name__) + with DAG( DAG_ID, start_date=datetime(2021, 1, 1), schedule="@once", catchup=False, - tags=["example"], + tags=["example", "gdrive"], ) as dag: + @task + def create_temp_gcp_connection(): + conn = Connection( + conn_id=CONNECTION_ID, + conn_type="google_cloud_platform", + ) + conn_extra_json = json.dumps( + { + "scope": "https://www.googleapis.com/auth/drive," + "https://www.googleapis.com/auth/cloud-platform" + } + ) + conn.set_extra(conn_extra_json) + + session: Session = Session() + if session.query(Connection).filter(Connection.conn_id == CONNECTION_ID).first(): + log.warning("Connection %s already exists", CONNECTION_ID) + return None + session.add(conn) + session.commit() + + create_temp_gcp_connection_task = create_temp_gcp_connection() + create_bucket = GCSCreateBucketOperator( task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID ) @@ -68,42 +100,62 @@ with DAG( copy_single_file = GCSToGoogleDriveOperator( task_id="copy_single_file", + gcp_conn_id=CONNECTION_ID, source_bucket=BUCKET_NAME, source_object=FILE_NAME, - destination_object=FILE_NAME, + destination_object=DRIVE_FILE_NAME, ) # [START detect_file] detect_file = GoogleDriveFileExistenceSensor( task_id="detect_file", + gcp_conn_id=CONNECTION_ID, folder_id="", - file_name=FILE_NAME, + file_name=DRIVE_FILE_NAME, ) # [END detect_file] # [START download_from_gdrive_to_local] download_from_gdrive_to_local = GoogleDriveToLocalOperator( task_id="download_from_gdrive_to_local", + gcp_conn_id=CONNECTION_ID, folder_id="", - file_name=FILE_NAME, + file_name=DRIVE_FILE_NAME, output_file=OUTPUT_FILE, ) # [END download_from_gdrive_to_local] + @task(trigger_rule=TriggerRule.ALL_DONE) + def remove_file_from_drive(): + service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn() + response = service.files().list(q=f"name = '{DRIVE_FILE_NAME}'").execute() + if files := response["files"]: + file = files[0] + log.info("Deleting file {}...", file) + service.files().delete(fileId=file["id"]) + log.info("Done.") + + remove_file_from_drive_task = remove_file_from_drive() + delete_bucket = GCSDeleteBucketOperator( task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE ) - chain( + delete_temp_gcp_connection_task = BashOperator( + task_id="delete_temp_gcp_connection", + bash_command=f"airflow connections delete {CONNECTION_ID}", + trigger_rule=TriggerRule.ALL_DONE, + ) + + ( # TEST SETUP - create_bucket, - upload_file, - copy_single_file, + [create_bucket >> upload_file >> copy_single_file, create_temp_gcp_connection_task] # TEST BODY - detect_file, - download_from_gdrive_to_local, + >> detect_file + >> download_from_gdrive_to_local # TEST TEARDOWN - delete_bucket, + >> remove_file_from_drive_task + >> [delete_bucket, delete_temp_gcp_connection_task] ) from tests.system.utils.watcher import watcher diff --git a/tests/system/providers/google/suite/example_local_to_drive.py b/tests/system/providers/google/suite/example_local_to_drive.py index 7e5130f68b..2738fd5607 100644 --- a/tests/system/providers/google/suite/example_local_to_drive.py +++ b/tests/system/providers/google/suite/example_local_to_drive.py @@ -23,23 +23,36 @@ https://www.googleapis.com/auth/drive """ from __future__ import annotations +import json +import logging +import os from datetime import datetime from pathlib import Path +from airflow.decorators import task +from airflow.models import Connection from airflow.models.dag import DAG +from airflow.operators.bash import BashOperator +from airflow.providers.google.suite.hooks.drive import GoogleDriveHook from airflow.providers.google.suite.transfers.local_to_drive import LocalFilesystemToGoogleDriveOperator +from airflow.settings import Session +from airflow.utils.trigger_rule import TriggerRule DAG_ID = "example_local_to_drive" +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") FILE_NAME_1 = "test1" FILE_NAME_2 = "test2" +CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}" LOCAL_PATH = str(Path(__file__).parent / "resources") SINGLE_FILE_LOCAL_PATHS = [str(Path(LOCAL_PATH) / FILE_NAME_1)] MULTIPLE_FILES_LOCAL_PATHS = [str(Path(LOCAL_PATH) / FILE_NAME_1), str(Path(LOCAL_PATH) / FILE_NAME_2)] -DRIVE_FOLDER = "test-folder" +DRIVE_FOLDER = f"test_folder_{DAG_ID}_{ENV_ID}" + +log = logging.getLogger(__name__) with DAG( @@ -47,11 +60,30 @@ with DAG( schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, - tags=["example"], + tags=["example", "gdrive"], ) as dag: + @task + def create_temp_gcp_connection(): + conn = Connection( + conn_id=CONNECTION_ID, + conn_type="google_cloud_platform", + ) + conn_extra_json = json.dumps({"scope": "https://www.googleapis.com/auth/drive"}) + conn.set_extra(conn_extra_json) + + session: Session = Session() + if session.query(Connection).filter(Connection.conn_id == CONNECTION_ID).first(): + log.warning("Connection %s already exists", CONNECTION_ID) + return None + session.add(conn) + session.commit() + + create_temp_gcp_connection_task = create_temp_gcp_connection() + # [START howto_operator_local_to_drive_upload_single_file] upload_single_file = LocalFilesystemToGoogleDriveOperator( + gcp_conn_id=CONNECTION_ID, task_id="upload_single_file", local_paths=SINGLE_FILE_LOCAL_PATHS, drive_folder=DRIVE_FOLDER, @@ -60,6 +92,7 @@ with DAG( # [START howto_operator_local_to_drive_upload_multiple_files] upload_multiple_files = LocalFilesystemToGoogleDriveOperator( + gcp_conn_id=CONNECTION_ID, task_id="upload_multiple_files", local_paths=MULTIPLE_FILES_LOCAL_PATHS, drive_folder=DRIVE_FOLDER, @@ -67,10 +100,39 @@ with DAG( ) # [END howto_operator_local_to_drive_upload_multiple_files] + @task(trigger_rule=TriggerRule.ALL_DONE) + def remove_files_from_drive(): + service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn() + root_path = ( + service.files() + .list(q=f"name = '{DRIVE_FOLDER}' and mimeType = 'application/vnd.google-apps.folder'") + .execute() + ) + if files := root_path["files"]: + batch = service.new_batch_http_request() + for file in files: + log.info("Preparing to remove file: {}", file) + batch.add(service.files().delete(fileId=file["id"])) + batch.execute() + log.info("Selected files removed.") + + remove_files_from_drive_task = remove_files_from_drive() + + delete_temp_gcp_connection_task = BashOperator( + task_id="delete_temp_gcp_connection", + bash_command=f"airflow connections delete {CONNECTION_ID}", + trigger_rule=TriggerRule.ALL_DONE, + ) + ( + # TEST SETUP + create_temp_gcp_connection_task # TEST BODY - upload_single_file + >> upload_single_file >> upload_multiple_files + # TEST TEARDOWN + >> remove_files_from_drive_task + >> delete_temp_gcp_connection_task ) from tests.system.utils.watcher import watcher