This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c251a0a18 Use redshift-data operators instead of SQL operators 
(#36113)
2c251a0a18 is described below

commit 2c251a0a1830020615039095f256c009ef4903bb
Author: Vincent <97131062+vincb...@users.noreply.github.com>
AuthorDate: Fri Dec 8 11:24:11 2023 -0500

    Use redshift-data operators instead of SQL operators (#36113)
---
 .../amazon/aws/example_redshift_s3_transfers.py    | 24 +++++++++++++++-------
 .../providers/amazon/aws/example_s3_to_sql.py      | 20 +++++++++++-------
 .../providers/amazon/aws/example_sql_to_s3.py      | 16 ++++++++++-----
 3 files changed, 41 insertions(+), 19 deletions(-)

diff --git a/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py 
b/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py
index db6160d93f..4fbf728fa8 100644
--- a/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py
+++ b/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py
@@ -28,6 +28,7 @@ from airflow.providers.amazon.aws.operators.redshift_cluster 
import (
     RedshiftCreateClusterOperator,
     RedshiftDeleteClusterOperator,
 )
+from airflow.providers.amazon.aws.operators.redshift_data import 
RedshiftDataOperator
 from airflow.providers.amazon.aws.operators.s3 import (
     S3CreateBucketOperator,
     S3CreateObjectOperator,
@@ -37,7 +38,6 @@ from airflow.providers.amazon.aws.sensors.redshift_cluster 
import RedshiftCluste
 from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
 from airflow.providers.amazon.aws.transfers.redshift_to_s3 import 
RedshiftToS3Operator
 from airflow.providers.amazon.aws.transfers.s3_to_redshift import 
S3ToRedshiftOperator
-from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
 from airflow.utils.trigger_rule import TriggerRule
 from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, 
SystemTestContextBuilder
 
@@ -144,15 +144,22 @@ with DAG(
         replace=True,
     )
 
-    create_table_redshift_data = SQLExecuteQueryOperator(
+    create_table_redshift_data = RedshiftDataOperator(
         task_id="create_table_redshift_data",
-        conn_id=conn_id_name,
+        cluster_identifier=redshift_cluster_identifier,
+        database=DB_NAME,
+        db_user=DB_LOGIN,
         sql=SQL_CREATE_TABLE,
+        wait_for_completion=True,
     )
-    insert_data = SQLExecuteQueryOperator(
+
+    insert_data = RedshiftDataOperator(
         task_id="insert_data",
-        conn_id=conn_id_name,
+        cluster_identifier=redshift_cluster_identifier,
+        database=DB_NAME,
+        db_user=DB_LOGIN,
         sql=SQL_INSERT_DATA,
+        wait_for_completion=True,
     )
 
     # [START howto_transfer_redshift_to_s3]
@@ -196,10 +203,13 @@ with DAG(
     )
     # [END howto_transfer_s3_to_redshift_multiple_keys]
 
-    drop_table = SQLExecuteQueryOperator(
+    drop_table = RedshiftDataOperator(
         task_id="drop_table",
-        conn_id=conn_id_name,
+        cluster_identifier=redshift_cluster_identifier,
+        database=DB_NAME,
+        db_user=DB_LOGIN,
         sql=SQL_DROP_TABLE,
+        wait_for_completion=True,
         trigger_rule=TriggerRule.ALL_DONE,
     )
     delete_cluster = RedshiftDeleteClusterOperator(
diff --git a/tests/system/providers/amazon/aws/example_s3_to_sql.py 
b/tests/system/providers/amazon/aws/example_s3_to_sql.py
index 2312271ce8..6349ceed70 100644
--- a/tests/system/providers/amazon/aws/example_s3_to_sql.py
+++ b/tests/system/providers/amazon/aws/example_s3_to_sql.py
@@ -28,6 +28,7 @@ from airflow.providers.amazon.aws.operators.redshift_cluster 
import (
     RedshiftCreateClusterOperator,
     RedshiftDeleteClusterOperator,
 )
+from airflow.providers.amazon.aws.operators.redshift_data import 
RedshiftDataOperator
 from airflow.providers.amazon.aws.operators.s3 import (
     S3CreateBucketOperator,
     S3CreateObjectOperator,
@@ -36,7 +37,7 @@ from airflow.providers.amazon.aws.operators.s3 import (
 )
 from airflow.providers.amazon.aws.sensors.redshift_cluster import 
RedshiftClusterSensor
 from airflow.providers.amazon.aws.transfers.s3_to_sql import S3ToSqlOperator
-from airflow.providers.common.sql.operators.sql import 
SQLExecuteQueryOperator, SQLTableCheckOperator
+from airflow.providers.common.sql.operators.sql import SQLTableCheckOperator
 from airflow.utils.trigger_rule import TriggerRule
 from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, 
SystemTestContextBuilder
 from tests.system.utils.watcher import watcher
@@ -132,15 +133,18 @@ with DAG(
         replace=True,
     )
 
-    create_table = SQLExecuteQueryOperator(
+    create_table = RedshiftDataOperator(
         task_id="create_sample_table",
-        conn_id=conn_id_name,
+        cluster_identifier=redshift_cluster_identifier,
+        database=DB_NAME,
+        db_user=DB_LOGIN,
         sql=f"""
             CREATE TABLE IF NOT EXISTS {SQL_TABLE_NAME} (
             cocktail_id INT NOT NULL,
             cocktail_name VARCHAR NOT NULL,
             base_spirit VARCHAR NOT NULL);
-          """,
+        """,
+        wait_for_completion=True,
     )
 
     # [START howto_transfer_s3_to_sql]
@@ -199,11 +203,13 @@ with DAG(
         },
     )
 
-    drop_table = SQLExecuteQueryOperator(
-        conn_id=conn_id_name,
-        trigger_rule=TriggerRule.ALL_DONE,
+    drop_table = RedshiftDataOperator(
         task_id="drop_table",
+        cluster_identifier=redshift_cluster_identifier,
+        database=DB_NAME,
+        db_user=DB_LOGIN,
         sql=f"DROP TABLE {SQL_TABLE_NAME}",
+        wait_for_completion=True,
     )
 
     delete_s3_objects = S3DeleteObjectsOperator(
diff --git a/tests/system/providers/amazon/aws/example_sql_to_s3.py 
b/tests/system/providers/amazon/aws/example_sql_to_s3.py
index 813420ca15..3b5e1220a6 100644
--- a/tests/system/providers/amazon/aws/example_sql_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_sql_to_s3.py
@@ -30,10 +30,10 @@ from 
airflow.providers.amazon.aws.operators.redshift_cluster import (
     RedshiftCreateClusterOperator,
     RedshiftDeleteClusterOperator,
 )
+from airflow.providers.amazon.aws.operators.redshift_data import 
RedshiftDataOperator
 from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, 
S3DeleteBucketOperator
 from airflow.providers.amazon.aws.sensors.redshift_cluster import 
RedshiftClusterSensor
 from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
-from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
 from airflow.utils.trigger_rule import TriggerRule
 from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, 
SystemTestContextBuilder
 
@@ -128,16 +128,22 @@ with DAG(
 
     set_up_connection = create_connection(conn_id_name, 
cluster_id=redshift_cluster_identifier)
 
-    create_table_redshift_data = SQLExecuteQueryOperator(
+    create_table_redshift_data = RedshiftDataOperator(
         task_id="create_table_redshift_data",
-        conn_id=conn_id_name,
+        cluster_identifier=redshift_cluster_identifier,
+        database=DB_NAME,
+        db_user=DB_LOGIN,
         sql=SQL_CREATE_TABLE,
+        wait_for_completion=True,
     )
 
-    insert_data = SQLExecuteQueryOperator(
+    insert_data = RedshiftDataOperator(
         task_id="insert_data",
-        conn_id=conn_id_name,
+        cluster_identifier=redshift_cluster_identifier,
+        database=DB_NAME,
+        db_user=DB_LOGIN,
         sql=SQL_INSERT_DATA,
+        wait_for_completion=True,
     )
 
     # [START howto_transfer_sql_to_s3]

Reply via email to