This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b067051d3b Update system tests for Google Sheets operators (#34911)
b067051d3b is described below

commit b067051d3bcec36187c159073ecebc0fc048c99b
Author: Maksim <maks...@google.com>
AuthorDate: Sat Oct 14 10:02:22 2023 +0200

    Update system tests for Google Sheets operators (#34911)
    
    * Update system tests for Google Sheets operators
    
    * Update DAG for system tests
---
 .../google/cloud/gcs/example_gcs_to_sheets.py      |  61 ++++-
 .../providers/google/cloud/gcs/example_sheets.py   |  53 +++-
 .../google/cloud/gcs/example_sheets_to_gcs.py      |  56 ++++-
 .../cloud/sql_to_sheets/example_sql_to_sheets.py   | 276 +++++++++++++++++++--
 4 files changed, 406 insertions(+), 40 deletions(-)

diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_sheets.py 
b/tests/system/providers/google/cloud/gcs/example_gcs_to_sheets.py
index e2166fd788..116b3792b6 100644
--- a/tests/system/providers/google/cloud/gcs/example_gcs_to_sheets.py
+++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_sheets.py
@@ -17,22 +17,32 @@
 # under the License.
 from __future__ import annotations
 
+import json
 import os
 from datetime import datetime
 
+from airflow.decorators import task
+from airflow.models import Connection
 from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
 from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
 from airflow.providers.google.cloud.transfers.sheets_to_gcs import 
GoogleSheetsToGCSOperator
+from airflow.providers.google.suite.operators.sheets import 
GoogleSheetsCreateSpreadsheetOperator
 from airflow.providers.google.suite.transfers.gcs_to_sheets import 
GCSToGoogleSheetsOperator
+from airflow.settings import Session
 from airflow.utils.trigger_rule import TriggerRule
 
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
 DAG_ID = "example_gcs_to_sheets"
 
 BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-SPREADSHEET_ID = os.environ.get("SPREADSHEET_ID", "example-spreadsheetID")
-NEW_SPREADSHEET_ID = os.environ.get("NEW_SPREADSHEET_ID", "1234567890qwerty")
+SPREADSHEET = {
+    "properties": {"title": "Test1"},
+    "sheets": [{"properties": {"title": "Sheet1"}}],
+}
+CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
+
 
 with DAG(
     DAG_ID,
@@ -45,10 +55,36 @@ with DAG(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
     )
 
+    @task
+    def create_temp_sheets_connection():
+        conn = Connection(
+            conn_id=CONNECTION_ID,
+            conn_type="google_cloud_platform",
+        )
+        conn_extra = {
+            "scope": 
"https://www.googleapis.com/auth/spreadsheets,https://www.googleapis.com/auth/cloud-platform";,
+            "project": PROJECT_ID,
+            "keyfile_dict": "",  # Override to match your needs
+        }
+        conn_extra_json = json.dumps(conn_extra)
+        conn.set_extra(conn_extra_json)
+
+        session: Session = Session()
+        session.add(conn)
+        session.commit()
+
+    create_temp_sheets_connection_task = create_temp_sheets_connection()
+
+    create_spreadsheet = GoogleSheetsCreateSpreadsheetOperator(
+        task_id="create_spreadsheet", spreadsheet=SPREADSHEET, 
gcp_conn_id=CONNECTION_ID
+    )
+
     upload_sheet_to_gcs = GoogleSheetsToGCSOperator(
         task_id="upload_sheet_to_gcs",
         destination_bucket=BUCKET_NAME,
-        spreadsheet_id=SPREADSHEET_ID,
+        spreadsheet_id="{{ 
task_instance.xcom_pull(task_ids='create_spreadsheet', "
+        "key='spreadsheet_id') }}",
+        gcp_conn_id=CONNECTION_ID,
     )
 
     # [START upload_gcs_to_sheets]
@@ -56,22 +92,31 @@ with DAG(
         task_id="upload_gcs_to_sheet",
         bucket_name=BUCKET_NAME,
         object_name="{{ task_instance.xcom_pull('upload_sheet_to_gcs')[0] }}",
-        spreadsheet_id=NEW_SPREADSHEET_ID,
+        spreadsheet_id="{{ 
task_instance.xcom_pull(task_ids='create_spreadsheet', "
+        "key='spreadsheet_id') }}",
+        gcp_conn_id=CONNECTION_ID,
     )
     # [END upload_gcs_to_sheets]
 
+    delete_temp_sheets_connection_task = BashOperator(
+        task_id="delete_temp_sheets_connection",
+        bash_command=f"airflow connections delete {CONNECTION_ID}",
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
     delete_bucket = GCSDeleteBucketOperator(
         task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
     )
 
     (
         # TEST SETUP
-        create_bucket
+        [create_bucket, create_temp_sheets_connection_task]
+        >> create_spreadsheet
         >> upload_sheet_to_gcs
         # TEST BODY
         >> upload_gcs_to_sheet
         # TEST TEARDOWN
-        >> delete_bucket
+        >> [delete_bucket, delete_temp_sheets_connection_task]
     )
 
     from tests.system.utils.watcher import watcher
diff --git a/tests/system/providers/google/cloud/gcs/example_sheets.py 
b/tests/system/providers/google/cloud/gcs/example_sheets.py
index ef43fd03fc..7079c47680 100644
--- a/tests/system/providers/google/cloud/gcs/example_sheets.py
+++ b/tests/system/providers/google/cloud/gcs/example_sheets.py
@@ -17,9 +17,12 @@
 # under the License.
 from __future__ import annotations
 
+import json
 import os
 from datetime import datetime
 
+from airflow.decorators import task
+from airflow.models import Connection
 from airflow.models.dag import DAG
 from airflow.models.xcom_arg import XComArg
 from airflow.operators.bash import BashOperator
@@ -27,20 +30,20 @@ from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator
 from airflow.providers.google.cloud.transfers.sheets_to_gcs import 
GoogleSheetsToGCSOperator
 from airflow.providers.google.suite.operators.sheets import 
GoogleSheetsCreateSpreadsheetOperator
 from airflow.providers.google.suite.transfers.gcs_to_sheets import 
GCSToGoogleSheetsOperator
+from airflow.settings import Session
 from airflow.utils.trigger_rule import TriggerRule
 
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
 DAG_ID = "example_sheets_gcs"
 
 BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-SPREADSHEET_ID = os.environ.get("SPREADSHEET_ID", "1234567890qwerty")
-NEW_SPREADSHEET_ID = os.environ.get("NEW_SPREADSHEET_ID", "1234567890qwerty")
-
 SPREADSHEET = {
     "properties": {"title": "Test1"},
     "sheets": [{"properties": {"title": "Sheet1"}}],
 }
+CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
+
 
 with DAG(
     DAG_ID,
@@ -53,17 +56,39 @@ with DAG(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
     )
 
+    @task
+    def create_temp_sheets_connection():
+        conn = Connection(
+            conn_id=CONNECTION_ID,
+            conn_type="google_cloud_platform",
+        )
+        conn_extra = {
+            "scope": 
"https://www.googleapis.com/auth/spreadsheets,https://www.googleapis.com/auth/cloud-platform";,
+            "project": PROJECT_ID,
+            "keyfile_dict": "",  # Override to match your needs
+        }
+        conn_extra_json = json.dumps(conn_extra)
+        conn.set_extra(conn_extra_json)
+
+        session: Session = Session()
+        session.add(conn)
+        session.commit()
+
+    create_temp_sheets_connection_task = create_temp_sheets_connection()
+
     # [START upload_sheet_to_gcs]
     upload_sheet_to_gcs = GoogleSheetsToGCSOperator(
         task_id="upload_sheet_to_gcs",
         destination_bucket=BUCKET_NAME,
-        spreadsheet_id=SPREADSHEET_ID,
+        spreadsheet_id="{{ 
task_instance.xcom_pull(task_ids='create_spreadsheet', "
+        "key='spreadsheet_id') }}",
+        gcp_conn_id=CONNECTION_ID,
     )
     # [END upload_sheet_to_gcs]
 
     # [START create_spreadsheet]
     create_spreadsheet = GoogleSheetsCreateSpreadsheetOperator(
-        task_id="create_spreadsheet", spreadsheet=SPREADSHEET
+        task_id="create_spreadsheet", spreadsheet=SPREADSHEET, 
gcp_conn_id=CONNECTION_ID
     )
     # [END create_spreadsheet]
 
@@ -79,24 +104,32 @@ with DAG(
         task_id="upload_gcs_to_sheet",
         bucket_name=BUCKET_NAME,
         object_name="{{ task_instance.xcom_pull('upload_sheet_to_gcs')[0] }}",
-        spreadsheet_id=NEW_SPREADSHEET_ID,
+        spreadsheet_id="{{ 
task_instance.xcom_pull(task_ids='create_spreadsheet', "
+        "key='spreadsheet_id') }}",
+        gcp_conn_id=CONNECTION_ID,
     )
     # [END upload_gcs_to_sheet]
 
+    delete_temp_sheets_connection_task = BashOperator(
+        task_id="delete_temp_sheets_connection",
+        bash_command=f"airflow connections delete {CONNECTION_ID}",
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
     delete_bucket = GCSDeleteBucketOperator(
         task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
     )
 
     (
         # TEST SETUP
-        create_bucket
+        [create_bucket, create_temp_sheets_connection_task]
         # TEST BODY
         >> create_spreadsheet
         >> print_spreadsheet_url
         >> upload_sheet_to_gcs
         >> upload_gcs_to_sheet
         # TEST TEARDOWN
-        >> delete_bucket
+        >> [delete_bucket, delete_temp_sheets_connection_task]
     )
 
     from tests.system.utils.watcher import watcher
diff --git a/tests/system/providers/google/cloud/gcs/example_sheets_to_gcs.py 
b/tests/system/providers/google/cloud/gcs/example_sheets_to_gcs.py
index ad06b8e6f3..3564d2c1e0 100644
--- a/tests/system/providers/google/cloud/gcs/example_sheets_to_gcs.py
+++ b/tests/system/providers/google/cloud/gcs/example_sheets_to_gcs.py
@@ -17,20 +17,31 @@
 # under the License.
 from __future__ import annotations
 
+import json
 import os
 from datetime import datetime
 
+from airflow.decorators import task
+from airflow.models import Connection
 from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
 from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
 from airflow.providers.google.cloud.transfers.sheets_to_gcs import 
GoogleSheetsToGCSOperator
+from airflow.providers.google.suite.operators.sheets import 
GoogleSheetsCreateSpreadsheetOperator
+from airflow.settings import Session
 from airflow.utils.trigger_rule import TriggerRule
 
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
 DAG_ID = "example_sheets_to_gcs"
 
 BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-SPREADSHEET_ID = os.environ.get("SPREADSHEET_ID", "1234567890qwerty")
+SPREADSHEET = {
+    "properties": {"title": "Test1"},
+    "sheets": [{"properties": {"title": "Sheet1"}}],
+}
+CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
+
 
 with DAG(
     DAG_ID,
@@ -43,25 +54,58 @@ with DAG(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
     )
 
+    @task
+    def create_temp_sheets_connection():
+        conn = Connection(
+            conn_id=CONNECTION_ID,
+            conn_type="google_cloud_platform",
+        )
+        conn_extra = {
+            "scope": 
"https://www.googleapis.com/auth/spreadsheets,https://www.googleapis.com/auth/cloud-platform";,
+            "project": PROJECT_ID,
+            "keyfile_dict": "",  # Override to match your needs
+        }
+        conn_extra_json = json.dumps(conn_extra)
+        conn.set_extra(conn_extra_json)
+
+        session: Session = Session()
+        session.add(conn)
+        session.commit()
+
+    create_temp_sheets_connection_task = create_temp_sheets_connection()
+
+    create_spreadsheet = GoogleSheetsCreateSpreadsheetOperator(
+        task_id="create_spreadsheet", spreadsheet=SPREADSHEET, 
gcp_conn_id=CONNECTION_ID
+    )
+
     # [START upload_sheet_to_gcs]
     upload_sheet_to_gcs = GoogleSheetsToGCSOperator(
         task_id="upload_sheet_to_gcs",
         destination_bucket=BUCKET_NAME,
-        spreadsheet_id=SPREADSHEET_ID,
+        spreadsheet_id="{{ 
task_instance.xcom_pull(task_ids='create_spreadsheet', "
+        "key='spreadsheet_id') }}",
+        gcp_conn_id=CONNECTION_ID,
     )
     # [END upload_sheet_to_gcs]
 
+    delete_temp_sheets_connection_task = BashOperator(
+        task_id="delete_temp_sheets_connection",
+        bash_command=f"airflow connections delete {CONNECTION_ID}",
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
     delete_bucket = GCSDeleteBucketOperator(
         task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
     )
 
     (
         # TEST SETUP
-        create_bucket
+        [create_bucket, create_temp_sheets_connection_task]
+        >> create_spreadsheet
         # TEST BODY
         >> upload_sheet_to_gcs
         # TEST TEARDOWN
-        >> delete_bucket
+        >> [delete_bucket, delete_temp_sheets_connection_task]
     )
 
     from tests.system.utils.watcher import watcher
diff --git 
a/tests/system/providers/google/cloud/sql_to_sheets/example_sql_to_sheets.py 
b/tests/system/providers/google/cloud/sql_to_sheets/example_sql_to_sheets.py
index a360a7de42..6d25c506a9 100644
--- a/tests/system/providers/google/cloud/sql_to_sheets/example_sql_to_sheets.py
+++ b/tests/system/providers/google/cloud/sql_to_sheets/example_sql_to_sheets.py
@@ -17,12 +17,6 @@
 # under the License.
 
 """
-Required environment variables:
-```
-DB_CONNECTION = os.environ.get("DB_CONNECTION")
-SPREADSHEET_ID = os.environ.get("SPREADSHEET_ID", "test-id")
-```
-
 First, you need a db instance that is accessible from the Airflow environment.
 You can, for example, create a Cloud SQL instance and connect to it from
 within breeze with Cloud SQL proxy:
@@ -66,18 +60,117 @@ DROP DATABASE test_db;
 """
 from __future__ import annotations
 
+import json
+import logging
 import os
 from datetime import datetime
 
+from airflow.decorators import task
+from airflow.models import Connection
 from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
+from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
+from airflow.providers.google.cloud.hooks.compute import ComputeEngineHook
+from airflow.providers.google.cloud.hooks.compute_ssh import 
ComputeEngineSSHHook
+from airflow.providers.google.cloud.operators.compute import (
+    ComputeEngineDeleteInstanceOperator,
+    ComputeEngineInsertInstanceOperator,
+)
+from airflow.providers.google.suite.operators.sheets import 
GoogleSheetsCreateSpreadsheetOperator
 from airflow.providers.google.suite.transfers.sql_to_sheets import 
SQLToGoogleSheetsOperator
-
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DB_CONNECTION = os.environ.get("DB_CONNECTION")
-SPREADSHEET_ID = os.environ.get("SPREADSHEET_ID", "test-id")
+from airflow.providers.ssh.operators.ssh import SSHOperator
+from airflow.settings import Session
+from airflow.utils.trigger_rule import TriggerRule
 
 DAG_ID = "example_sql_to_sheets"
-SQL = "select col2 from test_table"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+
+REGION = "europe-west2"
+ZONE = REGION + "-a"
+NETWORK = "default"
+
+SHEETS_CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
+SPREADSHEET = {
+    "properties": {"title": "Test1"},
+    "sheets": [{"properties": {"title": "Sheet1"}}],
+}
+
+DB_NAME = f"{DAG_ID}-{ENV_ID}-db".replace("-", "_")
+DB_PORT = 5432
+DB_USER_NAME = "demo_user"
+DB_USER_PASSWORD = "demo_password"
+DB_CONNECTION_ID = f"postgres_{DAG_ID}_{ENV_ID}".replace("-", "_")
+
+SHORT_MACHINE_TYPE_NAME = "n1-standard-1"
+DB_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-")
+GCE_INSTANCE_BODY = {
+    "name": DB_INSTANCE_NAME,
+    "machine_type": f"zones/{ZONE}/machineTypes/{SHORT_MACHINE_TYPE_NAME}",
+    "disks": [
+        {
+            "boot": True,
+            "device_name": DB_INSTANCE_NAME,
+            "initialize_params": {
+                "disk_size_gb": "10",
+                "disk_type": f"zones/{ZONE}/diskTypes/pd-balanced",
+                "source_image": 
"projects/debian-cloud/global/images/debian-11-bullseye-v20220621",
+            },
+        }
+    ],
+    "network_interfaces": [
+        {
+            "access_configs": [{"name": "External NAT", "network_tier": 
"PREMIUM"}],
+            "stack_type": "IPV4_ONLY",
+            "subnetwork": f"regions/{REGION}/subnetworks/default",
+        }
+    ],
+}
+DELETE_PERSISTENT_DISK = f"""
+if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
+ gcloud auth activate-service-account 
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
+fi;
+gcloud compute disks delete {DB_INSTANCE_NAME} --project={PROJECT_ID} 
--zone={ZONE} --quiet
+"""
+
+SETUP_POSTGRES = f"""
+sudo apt update &&
+sudo apt install -y docker.io &&
+sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \
+    -e POSTGRES_USER={DB_USER_NAME} \
+    -e POSTGRES_PASSWORD={DB_USER_PASSWORD} \
+    -e POSTGRES_DB={DB_NAME} \
+    postgres
+"""
+
+FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}"
+CREATE_FIREWALL_RULE = f"""
+if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
+ gcloud auth activate-service-account 
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
+fi;
+gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \
+  --project={PROJECT_ID} \
+  --direction=INGRESS \
+  --priority=100 \
+  --network={NETWORK} \
+  --action=ALLOW \
+  --rules=tcp:{DB_PORT} \
+  --source-ranges=0.0.0.0/0
+"""
+DELETE_FIREWALL_RULE = f"""
+if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
+ gcloud auth activate-service-account 
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
+fi;
+gcloud compute firewall-rules delete {FIREWALL_RULE_NAME} 
--project={PROJECT_ID} --quiet
+"""
+
+SQL_TABLE = "test_table"
+SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2 
VARCHAR(8))"
+SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2, 
'two')"
+SQL_SELECT = f"SELECT * FROM {SQL_TABLE}"
+
+log = logging.getLogger(__name__)
+
 
 with DAG(
     DAG_ID,
@@ -86,16 +179,167 @@ with DAG(
     catchup=False,
     tags=["example", "sql"],
 ) as dag:
+    create_instance = ComputeEngineInsertInstanceOperator(
+        task_id="create_instance",
+        project_id=PROJECT_ID,
+        zone=ZONE,
+        body=GCE_INSTANCE_BODY,
+    )
+
+    create_firewall_rule = BashOperator(
+        task_id="create_firewall_rule",
+        bash_command=CREATE_FIREWALL_RULE,
+    )
+
+    setup_postgres = SSHOperator(
+        task_id="setup_postgres",
+        ssh_hook=ComputeEngineSSHHook(
+            user="username",
+            instance_name=DB_INSTANCE_NAME,
+            zone=ZONE,
+            project_id=PROJECT_ID,
+            use_oslogin=False,
+            use_iap_tunnel=False,
+            cmd_timeout=180,
+        ),
+        command=SETUP_POSTGRES,
+        retries=2,
+        retry_delay=30,
+    )
+
+    @task
+    def get_public_ip() -> str:
+        hook = ComputeEngineHook()
+        address = hook.get_instance_address(resource_id=DB_INSTANCE_NAME, 
zone=ZONE, project_id=PROJECT_ID)
+        return address
+
+    get_public_ip_task = get_public_ip()
+
+    @task
+    def setup_postgres_connection(**kwargs) -> None:
+        public_ip = kwargs["ti"].xcom_pull(task_ids="get_public_ip")
+        connection = Connection(
+            conn_id=DB_CONNECTION_ID,
+            description="Example PostgreSQL connection",
+            conn_type="postgres",
+            host=public_ip,
+            login=DB_USER_NAME,
+            password=DB_USER_PASSWORD,
+            schema=DB_NAME,
+            port=DB_PORT,
+        )
+        session: Session = Session()
+        if session.query(Connection).filter(Connection.conn_id == 
DB_CONNECTION_ID).first():
+            log.warning("Connection %s already exists", DB_CONNECTION_ID)
+            return None
+
+        session.add(connection)
+        session.commit()
+
+    setup_postgres_connection_task = setup_postgres_connection()
+
+    @task
+    def setup_sheets_connection():
+        conn = Connection(
+            conn_id=SHEETS_CONNECTION_ID,
+            conn_type="google_cloud_platform",
+        )
+        conn_extra = {
+            "scope": 
"https://www.googleapis.com/auth/spreadsheets,https://www.googleapis.com/auth/cloud-platform";,
+            "project": PROJECT_ID,
+            "keyfile_dict": "",  # Override to match your needs
+        }
+        conn_extra_json = json.dumps(conn_extra)
+        conn.set_extra(conn_extra_json)
+
+        session: Session = Session()
+        session.add(conn)
+        session.commit()
+
+    setup_sheets_connection_task = setup_sheets_connection()
+
+    create_sql_table = SQLExecuteQueryOperator(
+        task_id="create_sql_table",
+        conn_id=DB_CONNECTION_ID,
+        sql=SQL_CREATE,
+    )
+
+    insert_data = SQLExecuteQueryOperator(
+        task_id="insert_data",
+        conn_id=DB_CONNECTION_ID,
+        sql=SQL_INSERT,
+    )
+
+    create_spreadsheet = GoogleSheetsCreateSpreadsheetOperator(
+        task_id="create_spreadsheet", spreadsheet=SPREADSHEET, 
gcp_conn_id=SHEETS_CONNECTION_ID
+    )
+
     # [START upload_sql_to_sheets]
-    upload_gcs_to_sheet = SQLToGoogleSheetsOperator(
+    upload_sql_to_sheet = SQLToGoogleSheetsOperator(
         task_id="upload_sql_to_sheet",
-        sql=SQL,
-        sql_conn_id=DB_CONNECTION,
-        database="test_db",
-        spreadsheet_id=SPREADSHEET_ID,
+        sql=SQL_SELECT,
+        sql_conn_id=DB_CONNECTION_ID,
+        database=DB_NAME,
+        spreadsheet_id="{{ 
task_instance.xcom_pull(task_ids='create_spreadsheet', "
+        "key='spreadsheet_id') }}",
+        gcp_conn_id=SHEETS_CONNECTION_ID,
     )
     # [END upload_sql_to_sheets]
 
+    delete_postgres_connection = BashOperator(
+        task_id="delete_postgres_connection",
+        bash_command=f"airflow connections delete {DB_CONNECTION_ID}",
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_sheets_connection = BashOperator(
+        task_id="delete_temp_sheets_connection",
+        bash_command=f"airflow connections delete {SHEETS_CONNECTION_ID}",
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_instance = ComputeEngineDeleteInstanceOperator(
+        task_id="delete_instance",
+        resource_id=DB_INSTANCE_NAME,
+        zone=ZONE,
+        project_id=PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_firewall_rule = BashOperator(
+        task_id="delete_firewall_rule",
+        bash_command=DELETE_FIREWALL_RULE,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_persistent_disk = BashOperator(
+        task_id="delete_persistent_disk",
+        bash_command=DELETE_PERSISTENT_DISK,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    # TEST SETUP
+    create_instance >> setup_postgres
+    (create_instance >> get_public_ip_task >> setup_postgres_connection_task)
+    (
+        [setup_postgres, setup_postgres_connection_task, create_firewall_rule]
+        >> create_sql_table
+        >> insert_data
+    )
+    (
+        [insert_data, setup_sheets_connection_task >> create_spreadsheet]
+        # TEST BODY
+        >> upload_sql_to_sheet
+        # TEST TEARDOWN
+        >> [delete_instance, delete_postgres_connection, 
delete_sheets_connection, delete_firewall_rule]
+    )
+    delete_instance >> delete_persistent_disk
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
 
 from tests.system.utils import get_test_run  # noqa: E402
 

Reply via email to