This is an automated email from the ASF dual-hosted git repository.
pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4dc2c40dfe Fix GCSToGoogleDriveOperator and gdrive system tests
(#34545)
4dc2c40dfe is described below
commit 4dc2c40dfefb3495e435aabb99c07fa6e4f32c5d
Author: max <[email protected]>
AuthorDate: Thu Oct 12 18:53:50 2023 +0200
Fix GCSToGoogleDriveOperator and gdrive system tests (#34545)
---
.../google/suite/transfers/gcs_to_gdrive.py | 2 +-
.../google/suite/transfers/test_gcs_to_gdrive.py | 14 ++--
.../google/cloud/gcs/example_gcs_to_gdrive.py | 93 ++++++++++++++++++----
.../google/cloud/gcs/example_gdrive_to_gcs.py | 74 ++++++++++++++---
.../cloud/transfers/example_gdrive_to_local.py | 78 +++++++++++++++---
.../google/suite/example_local_to_drive.py | 68 +++++++++++++++-
6 files changed, 281 insertions(+), 48 deletions(-)
diff --git a/airflow/providers/google/suite/transfers/gcs_to_gdrive.py
b/airflow/providers/google/suite/transfers/gcs_to_gdrive.py
index 0915906b1c..99f9ccafc5 100644
--- a/airflow/providers/google/suite/transfers/gcs_to_gdrive.py
+++ b/airflow/providers/google/suite/transfers/gcs_to_gdrive.py
@@ -95,7 +95,7 @@ class GCSToGoogleDriveOperator(BaseOperator):
source_bucket: str,
source_object: str,
destination_object: str | None = None,
- destination_folder_id: str | None = None,
+ destination_folder_id: str = "root",
move_object: bool = False,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
diff --git a/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py
b/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py
index 47d1f6e038..8e97be5b0e 100644
--- a/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py
+++ b/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py
@@ -66,7 +66,7 @@ class TestGcsToGDriveOperator:
mock.call().upload_file(
local_location="TMP1",
remote_location="copied_sales/2017/january-backup.avro",
- folder_id=None,
+ folder_id="root",
),
]
)
@@ -156,13 +156,13 @@ class TestGcsToGDriveOperator:
impersonation_chain=IMPERSONATION_CHAIN,
),
mock.call().upload_file(
- local_location="TMP1", remote_location="sales/A.avro",
folder_id=None
+ local_location="TMP1", remote_location="sales/A.avro",
folder_id="root"
),
mock.call().upload_file(
- local_location="TMP2", remote_location="sales/B.avro",
folder_id=None
+ local_location="TMP2", remote_location="sales/B.avro",
folder_id="root"
),
mock.call().upload_file(
- local_location="TMP3", remote_location="sales/C.avro",
folder_id=None
+ local_location="TMP3", remote_location="sales/C.avro",
folder_id="root"
),
]
)
@@ -210,13 +210,13 @@ class TestGcsToGDriveOperator:
impersonation_chain=IMPERSONATION_CHAIN,
),
mock.call().upload_file(
- local_location="TMP1", remote_location="sales/A.avro",
folder_id=None
+ local_location="TMP1", remote_location="sales/A.avro",
folder_id="root"
),
mock.call().upload_file(
- local_location="TMP2", remote_location="sales/B.avro",
folder_id=None
+ local_location="TMP2", remote_location="sales/B.avro",
folder_id="root"
),
mock.call().upload_file(
- local_location="TMP3", remote_location="sales/C.avro",
folder_id=None
+ local_location="TMP3", remote_location="sales/C.avro",
folder_id="root"
),
]
)
diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
b/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
index e9cd16d8f9..541964e82d 100644
--- a/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
+++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
@@ -23,53 +23,86 @@ https://www.googleapis.com/auth/drive
"""
from __future__ import annotations
+import json
+import logging
import os
from datetime import datetime
from pathlib import Path
+from airflow.decorators import task
+from airflow.models import Connection
from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import
LocalFilesystemToGCSOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
from airflow.providers.google.suite.transfers.gcs_to_gdrive import
GCSToGoogleDriveOperator
+from airflow.settings import Session
from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
-FOLDER_ID = os.environ.get("GCP_GDRIVE_FOLDER_ID", "abcd1234")
+FOLDER_ID = os.environ.get("GCP_GDRIVE_FOLDER_ID", None)
DAG_ID = "example_gcs_to_gdrive"
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
TMP_PATH = "tmp"
-
+WORK_DIR = f"folder_{DAG_ID}_{ENV_ID}".replace("-", "_")
CURRENT_FOLDER = Path(__file__).parent
LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources")
-
FILE_LOCAL_PATH = str(Path(LOCAL_PATH))
FILE_NAME = "example_upload.txt"
+log = logging.getLogger(__name__)
+
with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example", "gcs"],
+ tags=["example", "gcs", "gdrive"],
) as dag:
+
+ @task
+ def create_temp_gcp_connection():
+ conn = Connection(
+ conn_id=CONNECTION_ID,
+ conn_type="google_cloud_platform",
+ )
+ conn_extra_json = json.dumps(
+ {
+ "scope": "https://www.googleapis.com/auth/drive,"
+ "https://www.googleapis.com/auth/cloud-platform"
+ }
+ )
+ conn.set_extra(conn_extra_json)
+
+ session: Session = Session()
+ if session.query(Connection).filter(Connection.conn_id ==
CONNECTION_ID).first():
+ log.warning("Connection %s already exists", CONNECTION_ID)
+ return None
+ session.add(conn)
+ session.commit()
+
+ create_temp_gcp_connection_task = create_temp_gcp_connection()
+
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)
- upload_file = LocalFilesystemToGCSOperator(
- task_id="upload_file",
+ upload_file_1 = LocalFilesystemToGCSOperator(
+ task_id="upload_file_1",
src=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
dst=f"{TMP_PATH}/{FILE_NAME}",
bucket=BUCKET_NAME,
)
upload_file_2 = LocalFilesystemToGCSOperator(
- task_id="upload_fil_2",
+ task_id="upload_file_2",
src=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
dst=f"{TMP_PATH}/2_{FILE_NAME}",
bucket=BUCKET_NAME,
@@ -77,18 +110,20 @@ with DAG(
# [START howto_operator_gcs_to_gdrive_copy_single_file]
copy_single_file = GCSToGoogleDriveOperator(
task_id="copy_single_file",
+ gcp_conn_id=CONNECTION_ID,
source_bucket=BUCKET_NAME,
source_object=f"{TMP_PATH}/{FILE_NAME}",
- destination_object=f"copied_tmp/copied_{FILE_NAME}",
+ destination_object=f"{WORK_DIR}/copied_{FILE_NAME}",
)
# [END howto_operator_gcs_to_gdrive_copy_single_file]
# [START howto_operator_gcs_to_gdrive_copy_single_file_into_folder]
copy_single_file_into_folder = GCSToGoogleDriveOperator(
task_id="copy_single_file_into_folder",
+ gcp_conn_id=CONNECTION_ID,
source_bucket=BUCKET_NAME,
source_object=f"{TMP_PATH}/{FILE_NAME}",
- destination_object=f"copied_tmp/copied_{FILE_NAME}",
+ destination_object=f"{WORK_DIR}/copied_{FILE_NAME}",
destination_folder_id=FOLDER_ID,
)
# [END howto_operator_gcs_to_gdrive_copy_single_file_into_folder]
@@ -96,36 +131,64 @@ with DAG(
# [START howto_operator_gcs_to_gdrive_copy_files]
copy_files = GCSToGoogleDriveOperator(
task_id="copy_files",
+ gcp_conn_id=CONNECTION_ID,
source_bucket=BUCKET_NAME,
source_object=f"{TMP_PATH}/*",
- destination_object="copied_tmp/",
+ destination_object=f"{WORK_DIR}/",
)
# [END howto_operator_gcs_to_gdrive_copy_files]
# [START howto_operator_gcs_to_gdrive_move_files]
move_files = GCSToGoogleDriveOperator(
task_id="move_files",
+ gcp_conn_id=CONNECTION_ID,
source_bucket=BUCKET_NAME,
source_object=f"{TMP_PATH}/*.txt",
+ destination_object=f"{WORK_DIR}/",
move_object=True,
)
# [END howto_operator_gcs_to_gdrive_move_files]
+ @task(trigger_rule=TriggerRule.ALL_DONE)
+ def remove_files_from_drive():
+ service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn()
+ root_path = (
+ service.files()
+ .list(q=f"name = '{WORK_DIR}' and mimeType =
'application/vnd.google-apps.folder'")
+ .execute()
+ )
+ if files := root_path["files"]:
+ batch = service.new_batch_http_request()
+ for file in files:
+ log.info("Preparing to remove file: {}", file)
+ batch.add(service.files().delete(fileId=file["id"]))
+ batch.execute()
+ log.info("Selected files removed.")
+
+ remove_files_from_drive_task = remove_files_from_drive()
+
delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
)
+ delete_temp_gcp_connection_task = BashOperator(
+ task_id="delete_temp_gcp_connection",
+ bash_command=f"airflow connections delete {CONNECTION_ID}",
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ # TEST SETUP
+ create_bucket >> [upload_file_1, upload_file_2]
(
- # TEST SETUP
- create_bucket
- >> upload_file
- >> upload_file_2
+ [upload_file_1, upload_file_2, create_temp_gcp_connection_task]
# TEST BODY
>> copy_single_file
+ >> copy_single_file_into_folder
>> copy_files
>> move_files
# TEST TEARDOWN
- >> delete_bucket
+ >> remove_files_from_drive_task
+ >> [delete_bucket, delete_temp_gcp_connection_task]
)
from tests.system.utils.watcher import watcher
diff --git a/tests/system/providers/google/cloud/gcs/example_gdrive_to_gcs.py
b/tests/system/providers/google/cloud/gcs/example_gdrive_to_gcs.py
index b3debb12dd..883cb33054 100644
--- a/tests/system/providers/google/cloud/gcs/example_gdrive_to_gcs.py
+++ b/tests/system/providers/google/cloud/gcs/example_gdrive_to_gcs.py
@@ -17,16 +17,23 @@
# under the License.
from __future__ import annotations
+import json
+import logging
import os
from datetime import datetime
from pathlib import Path
+from airflow.decorators import task
+from airflow.models import Connection
from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.gdrive_to_gcs import
GoogleDriveToGCSOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import
LocalFilesystemToGCSOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
from airflow.providers.google.suite.sensors.drive import
GoogleDriveFileExistenceSensor
from airflow.providers.google.suite.transfers.gcs_to_gdrive import
GCSToGoogleDriveOperator
+from airflow.settings import Session
from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
@@ -35,20 +42,48 @@ PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
DAG_ID = "example_gdrive_to_gcs_with_gdrive_sensor"
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
OBJECT = "abc123xyz"
FOLDER_ID = ""
FILE_NAME = "example_upload.txt"
+DRIVE_FILE_NAME = f"example_upload_{DAG_ID}_{ENV_ID}.txt"
LOCAL_PATH = str(Path(__file__).parent / "resources" / FILE_NAME)
+log = logging.getLogger(__name__)
+
+
with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example"],
+ tags=["example", "gcs", "gdrive"],
) as dag:
+ @task
+ def create_temp_gcp_connection():
+ conn = Connection(
+ conn_id=CONNECTION_ID,
+ conn_type="google_cloud_platform",
+ )
+ conn_extra_json = json.dumps(
+ {
+ "scope": "https://www.googleapis.com/auth/drive,"
+ "https://www.googleapis.com/auth/cloud-platform"
+ }
+ )
+ conn.set_extra(conn_extra_json)
+
+ session: Session = Session()
+ if session.query(Connection).filter(Connection.conn_id ==
CONNECTION_ID).first():
+ log.warning("Connection %s already exists", CONNECTION_ID)
+ return None
+ session.add(conn)
+ session.commit()
+
+ create_temp_gcp_connection_task = create_temp_gcp_connection()
+
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)
@@ -62,41 +97,62 @@ with DAG(
copy_single_file = GCSToGoogleDriveOperator(
task_id="copy_single_file",
+ gcp_conn_id=CONNECTION_ID,
source_bucket=BUCKET_NAME,
source_object=FILE_NAME,
- destination_object=FILE_NAME,
+ destination_object=DRIVE_FILE_NAME,
)
# [START detect_file]
detect_file = GoogleDriveFileExistenceSensor(
- task_id="detect_file", folder_id=FOLDER_ID, file_name=FILE_NAME
+ task_id="detect_file",
+ folder_id=FOLDER_ID,
+ file_name=DRIVE_FILE_NAME,
+ gcp_conn_id=CONNECTION_ID,
)
# [END detect_file]
# [START upload_gdrive_to_gcs]
upload_gdrive_to_gcs = GoogleDriveToGCSOperator(
task_id="upload_gdrive_object_to_gcs",
+ gcp_conn_id=CONNECTION_ID,
folder_id=FOLDER_ID,
- file_name=FILE_NAME,
+ file_name=DRIVE_FILE_NAME,
bucket_name=BUCKET_NAME,
object_name=OBJECT,
)
# [END upload_gdrive_to_gcs]
+ @task(trigger_rule=TriggerRule.ALL_DONE)
+ def remove_files_from_drive():
+ service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn()
+ response = service.files().list(q=f"name =
'{DRIVE_FILE_NAME}'").execute()
+ if files := response["files"]:
+ file = files[0]
+ log.info("Deleting file {}...", file)
+ service.files().delete(fileId=file["id"])
+ log.info("Done.")
+
+ remove_files_from_drive_task = remove_files_from_drive()
+
delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
)
+ delete_temp_gcp_connection_task = BashOperator(
+ task_id="delete_temp_gcp_connection",
+ bash_command=f"airflow connections delete {CONNECTION_ID}",
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
(
- # TEST SETUP
- create_bucket
- >> upload_file
- >> copy_single_file
+ [create_bucket >> upload_file >> copy_single_file,
create_temp_gcp_connection_task]
# TEST BODY
>> detect_file
>> upload_gdrive_to_gcs
# TEST TEARDOWN
- >> delete_bucket
+ >> remove_files_from_drive_task
+ >> [delete_bucket, delete_temp_gcp_connection_task]
)
from tests.system.utils.watcher import watcher
diff --git
a/tests/system/providers/google/cloud/transfers/example_gdrive_to_local.py
b/tests/system/providers/google/cloud/transfers/example_gdrive_to_local.py
index a7a0b6ac3d..a989af9cfd 100644
--- a/tests/system/providers/google/cloud/transfers/example_gdrive_to_local.py
+++ b/tests/system/providers/google/cloud/transfers/example_gdrive_to_local.py
@@ -23,38 +23,70 @@ https://www.googleapis.com/auth/drive
"""
from __future__ import annotations
+import logging
import os
from datetime import datetime
from pathlib import Path
-from airflow.models.baseoperator import chain
+from airflow.decorators import task
+from airflow.models import Connection
from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.gdrive_to_local import
GoogleDriveToLocalOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import
LocalFilesystemToGCSOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
from airflow.providers.google.suite.sensors.drive import
GoogleDriveFileExistenceSensor
from airflow.providers.google.suite.transfers.gcs_to_gdrive import
GCSToGoogleDriveOperator
+from airflow.settings import Session, json
from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = "example_gdrive_to_local"
+CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
FILE_NAME = "empty.txt"
+DRIVE_FILE_NAME = f"empty_{DAG_ID}_{ENV_ID}.txt".replace("-", "_")
OUTPUT_FILE = "out_file.txt"
-DAG_ID = "example_gdrive_to_local"
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
LOCAL_PATH = str(Path(__file__).parent / "resources" / FILE_NAME)
+log = logging.getLogger(__name__)
+
with DAG(
DAG_ID,
start_date=datetime(2021, 1, 1),
schedule="@once",
catchup=False,
- tags=["example"],
+ tags=["example", "gdrive"],
) as dag:
+ @task
+ def create_temp_gcp_connection():
+ conn = Connection(
+ conn_id=CONNECTION_ID,
+ conn_type="google_cloud_platform",
+ )
+ conn_extra_json = json.dumps(
+ {
+ "scope": "https://www.googleapis.com/auth/drive,"
+ "https://www.googleapis.com/auth/cloud-platform"
+ }
+ )
+ conn.set_extra(conn_extra_json)
+
+ session: Session = Session()
+ if session.query(Connection).filter(Connection.conn_id ==
CONNECTION_ID).first():
+ log.warning("Connection %s already exists", CONNECTION_ID)
+ return None
+ session.add(conn)
+ session.commit()
+
+ create_temp_gcp_connection_task = create_temp_gcp_connection()
+
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)
@@ -68,42 +100,62 @@ with DAG(
copy_single_file = GCSToGoogleDriveOperator(
task_id="copy_single_file",
+ gcp_conn_id=CONNECTION_ID,
source_bucket=BUCKET_NAME,
source_object=FILE_NAME,
- destination_object=FILE_NAME,
+ destination_object=DRIVE_FILE_NAME,
)
# [START detect_file]
detect_file = GoogleDriveFileExistenceSensor(
task_id="detect_file",
+ gcp_conn_id=CONNECTION_ID,
folder_id="",
- file_name=FILE_NAME,
+ file_name=DRIVE_FILE_NAME,
)
# [END detect_file]
# [START download_from_gdrive_to_local]
download_from_gdrive_to_local = GoogleDriveToLocalOperator(
task_id="download_from_gdrive_to_local",
+ gcp_conn_id=CONNECTION_ID,
folder_id="",
- file_name=FILE_NAME,
+ file_name=DRIVE_FILE_NAME,
output_file=OUTPUT_FILE,
)
# [END download_from_gdrive_to_local]
+ @task(trigger_rule=TriggerRule.ALL_DONE)
+ def remove_file_from_drive():
+ service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn()
+ response = service.files().list(q=f"name =
'{DRIVE_FILE_NAME}'").execute()
+ if files := response["files"]:
+ file = files[0]
+ log.info("Deleting file {}...", file)
+ service.files().delete(fileId=file["id"])
+ log.info("Done.")
+
+ remove_file_from_drive_task = remove_file_from_drive()
+
delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
)
- chain(
+ delete_temp_gcp_connection_task = BashOperator(
+ task_id="delete_temp_gcp_connection",
+ bash_command=f"airflow connections delete {CONNECTION_ID}",
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ (
# TEST SETUP
- create_bucket,
- upload_file,
- copy_single_file,
+ [create_bucket >> upload_file >> copy_single_file,
create_temp_gcp_connection_task]
# TEST BODY
- detect_file,
- download_from_gdrive_to_local,
+ >> detect_file
+ >> download_from_gdrive_to_local
# TEST TEARDOWN
- delete_bucket,
+ >> remove_file_from_drive_task
+ >> [delete_bucket, delete_temp_gcp_connection_task]
)
from tests.system.utils.watcher import watcher
diff --git a/tests/system/providers/google/suite/example_local_to_drive.py
b/tests/system/providers/google/suite/example_local_to_drive.py
index 7e5130f68b..2738fd5607 100644
--- a/tests/system/providers/google/suite/example_local_to_drive.py
+++ b/tests/system/providers/google/suite/example_local_to_drive.py
@@ -23,23 +23,36 @@ https://www.googleapis.com/auth/drive
"""
from __future__ import annotations
+import json
+import logging
+import os
from datetime import datetime
from pathlib import Path
+from airflow.decorators import task
+from airflow.models import Connection
from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
from airflow.providers.google.suite.transfers.local_to_drive import
LocalFilesystemToGoogleDriveOperator
+from airflow.settings import Session
+from airflow.utils.trigger_rule import TriggerRule
DAG_ID = "example_local_to_drive"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
FILE_NAME_1 = "test1"
FILE_NAME_2 = "test2"
+CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
LOCAL_PATH = str(Path(__file__).parent / "resources")
SINGLE_FILE_LOCAL_PATHS = [str(Path(LOCAL_PATH) / FILE_NAME_1)]
MULTIPLE_FILES_LOCAL_PATHS = [str(Path(LOCAL_PATH) / FILE_NAME_1),
str(Path(LOCAL_PATH) / FILE_NAME_2)]
-DRIVE_FOLDER = "test-folder"
+DRIVE_FOLDER = f"test_folder_{DAG_ID}_{ENV_ID}"
+
+log = logging.getLogger(__name__)
with DAG(
@@ -47,11 +60,30 @@ with DAG(
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example"],
+ tags=["example", "gdrive"],
) as dag:
+ @task
+ def create_temp_gcp_connection():
+ conn = Connection(
+ conn_id=CONNECTION_ID,
+ conn_type="google_cloud_platform",
+ )
+ conn_extra_json = json.dumps({"scope":
"https://www.googleapis.com/auth/drive"})
+ conn.set_extra(conn_extra_json)
+
+ session: Session = Session()
+ if session.query(Connection).filter(Connection.conn_id ==
CONNECTION_ID).first():
+ log.warning("Connection %s already exists", CONNECTION_ID)
+ return None
+ session.add(conn)
+ session.commit()
+
+ create_temp_gcp_connection_task = create_temp_gcp_connection()
+
# [START howto_operator_local_to_drive_upload_single_file]
upload_single_file = LocalFilesystemToGoogleDriveOperator(
+ gcp_conn_id=CONNECTION_ID,
task_id="upload_single_file",
local_paths=SINGLE_FILE_LOCAL_PATHS,
drive_folder=DRIVE_FOLDER,
@@ -60,6 +92,7 @@ with DAG(
# [START howto_operator_local_to_drive_upload_multiple_files]
upload_multiple_files = LocalFilesystemToGoogleDriveOperator(
+ gcp_conn_id=CONNECTION_ID,
task_id="upload_multiple_files",
local_paths=MULTIPLE_FILES_LOCAL_PATHS,
drive_folder=DRIVE_FOLDER,
@@ -67,10 +100,39 @@ with DAG(
)
# [END howto_operator_local_to_drive_upload_multiple_files]
+ @task(trigger_rule=TriggerRule.ALL_DONE)
+ def remove_files_from_drive():
+ service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn()
+ root_path = (
+ service.files()
+ .list(q=f"name = '{DRIVE_FOLDER}' and mimeType =
'application/vnd.google-apps.folder'")
+ .execute()
+ )
+ if files := root_path["files"]:
+ batch = service.new_batch_http_request()
+ for file in files:
+ log.info("Preparing to remove file: {}", file)
+ batch.add(service.files().delete(fileId=file["id"]))
+ batch.execute()
+ log.info("Selected files removed.")
+
+ remove_files_from_drive_task = remove_files_from_drive()
+
+ delete_temp_gcp_connection_task = BashOperator(
+ task_id="delete_temp_gcp_connection",
+ bash_command=f"airflow connections delete {CONNECTION_ID}",
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
(
+ # TEST SETUP
+ create_temp_gcp_connection_task
# TEST BODY
- upload_single_file
+ >> upload_single_file
>> upload_multiple_files
+ # TEST TEARDOWN
+ >> remove_files_from_drive_task
+ >> delete_temp_gcp_connection_task
)
from tests.system.utils.watcher import watcher