This is an automated email from the ASF dual-hosted git repository. eladkal pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 2c251a0a18 Use redshift-data operators instead of SQL operators (#36113) 2c251a0a18 is described below commit 2c251a0a1830020615039095f256c009ef4903bb Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Fri Dec 8 11:24:11 2023 -0500 Use redshift-data operators instead of SQL operators (#36113) --- .../amazon/aws/example_redshift_s3_transfers.py | 24 +++++++++++++++------- .../providers/amazon/aws/example_s3_to_sql.py | 20 +++++++++++------- .../providers/amazon/aws/example_sql_to_s3.py | 16 ++++++++++----- 3 files changed, 41 insertions(+), 19 deletions(-) diff --git a/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py b/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py index db6160d93f..4fbf728fa8 100644 --- a/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py +++ b/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py @@ -28,6 +28,7 @@ from airflow.providers.amazon.aws.operators.redshift_cluster import ( RedshiftCreateClusterOperator, RedshiftDeleteClusterOperator, ) +from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator from airflow.providers.amazon.aws.operators.s3 import ( S3CreateBucketOperator, S3CreateObjectOperator, @@ -37,7 +38,6 @@ from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftCluste from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator -from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.utils.trigger_rule import TriggerRule from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder @@ -144,15 +144,22 @@ with DAG( replace=True, ) - create_table_redshift_data = SQLExecuteQueryOperator( + create_table_redshift_data = RedshiftDataOperator( task_id="create_table_redshift_data", - conn_id=conn_id_name, + cluster_identifier=redshift_cluster_identifier, + database=DB_NAME, + db_user=DB_LOGIN, sql=SQL_CREATE_TABLE, + wait_for_completion=True, ) - insert_data = SQLExecuteQueryOperator( + + insert_data = RedshiftDataOperator( task_id="insert_data", - conn_id=conn_id_name, + cluster_identifier=redshift_cluster_identifier, + database=DB_NAME, + db_user=DB_LOGIN, sql=SQL_INSERT_DATA, + wait_for_completion=True, ) # [START howto_transfer_redshift_to_s3] @@ -196,10 +203,13 @@ with DAG( ) # [END howto_transfer_s3_to_redshift_multiple_keys] - drop_table = SQLExecuteQueryOperator( + drop_table = RedshiftDataOperator( task_id="drop_table", - conn_id=conn_id_name, + cluster_identifier=redshift_cluster_identifier, + database=DB_NAME, + db_user=DB_LOGIN, sql=SQL_DROP_TABLE, + wait_for_completion=True, trigger_rule=TriggerRule.ALL_DONE, ) delete_cluster = RedshiftDeleteClusterOperator( diff --git a/tests/system/providers/amazon/aws/example_s3_to_sql.py b/tests/system/providers/amazon/aws/example_s3_to_sql.py index 2312271ce8..6349ceed70 100644 --- a/tests/system/providers/amazon/aws/example_s3_to_sql.py +++ b/tests/system/providers/amazon/aws/example_s3_to_sql.py @@ -28,6 +28,7 @@ from airflow.providers.amazon.aws.operators.redshift_cluster import ( RedshiftCreateClusterOperator, RedshiftDeleteClusterOperator, ) +from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator from airflow.providers.amazon.aws.operators.s3 import ( S3CreateBucketOperator, S3CreateObjectOperator, @@ -36,7 +37,7 @@ from airflow.providers.amazon.aws.operators.s3 import ( ) from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor from airflow.providers.amazon.aws.transfers.s3_to_sql import S3ToSqlOperator -from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator, SQLTableCheckOperator +from airflow.providers.common.sql.operators.sql import SQLTableCheckOperator from airflow.utils.trigger_rule import TriggerRule from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder from tests.system.utils.watcher import watcher @@ -132,15 +133,18 @@ with DAG( replace=True, ) - create_table = SQLExecuteQueryOperator( + create_table = RedshiftDataOperator( task_id="create_sample_table", - conn_id=conn_id_name, + cluster_identifier=redshift_cluster_identifier, + database=DB_NAME, + db_user=DB_LOGIN, sql=f""" CREATE TABLE IF NOT EXISTS {SQL_TABLE_NAME} ( cocktail_id INT NOT NULL, cocktail_name VARCHAR NOT NULL, base_spirit VARCHAR NOT NULL); - """, + """, + wait_for_completion=True, ) # [START howto_transfer_s3_to_sql] @@ -199,11 +203,13 @@ with DAG( }, ) - drop_table = SQLExecuteQueryOperator( - conn_id=conn_id_name, - trigger_rule=TriggerRule.ALL_DONE, + drop_table = RedshiftDataOperator( task_id="drop_table", + cluster_identifier=redshift_cluster_identifier, + database=DB_NAME, + db_user=DB_LOGIN, sql=f"DROP TABLE {SQL_TABLE_NAME}", + wait_for_completion=True, ) delete_s3_objects = S3DeleteObjectsOperator( diff --git a/tests/system/providers/amazon/aws/example_sql_to_s3.py b/tests/system/providers/amazon/aws/example_sql_to_s3.py index 813420ca15..3b5e1220a6 100644 --- a/tests/system/providers/amazon/aws/example_sql_to_s3.py +++ b/tests/system/providers/amazon/aws/example_sql_to_s3.py @@ -30,10 +30,10 @@ from airflow.providers.amazon.aws.operators.redshift_cluster import ( RedshiftCreateClusterOperator, RedshiftDeleteClusterOperator, ) +from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator -from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.utils.trigger_rule import TriggerRule from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder @@ -128,16 +128,22 @@ with DAG( set_up_connection = create_connection(conn_id_name, cluster_id=redshift_cluster_identifier) - create_table_redshift_data = SQLExecuteQueryOperator( + create_table_redshift_data = RedshiftDataOperator( task_id="create_table_redshift_data", - conn_id=conn_id_name, + cluster_identifier=redshift_cluster_identifier, + database=DB_NAME, + db_user=DB_LOGIN, sql=SQL_CREATE_TABLE, + wait_for_completion=True, ) - insert_data = SQLExecuteQueryOperator( + insert_data = RedshiftDataOperator( task_id="insert_data", - conn_id=conn_id_name, + cluster_identifier=redshift_cluster_identifier, + database=DB_NAME, + db_user=DB_LOGIN, sql=SQL_INSERT_DATA, + wait_for_completion=True, ) # [START howto_transfer_sql_to_s3]