This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 33a14198c1 Updates to Teradata Provider (#40378) 33a14198c1 is described below commit 33a14198c106be4b0a89d67bd711b560b0f4251d Author: Satish Chinthanippu <satishchinthani...@gmail.com> AuthorDate: Sat Jun 22 02:50:41 2024 -0700 Updates to Teradata Provider (#40378) Added support of teradata authorization object for cloud transfer operators to teradata. (#46) 1. Added teradata authorization object for authorization in transfer operators 2. Added security token support in s3toteradata transfer operator --- airflow/providers/teradata/operators/teradata.py | 2 +- .../teradata/transfers/azure_blob_to_teradata.py | 46 +++++++++---- .../providers/teradata/transfers/s3_to_teradata.py | 28 +++++--- .../operators/azure_blob_to_teradata.rst | 67 +++++++++++++++++- .../operators/s3_to_teradata.rst | 10 ++- .../example_azure_blob_to_teradata_transfer.py | 80 +++++++++++++++++++++- .../teradata/example_s3_to_teradata_transfer.py | 79 +++++++++++++++++++-- 7 files changed, 276 insertions(+), 36 deletions(-) diff --git a/airflow/providers/teradata/operators/teradata.py b/airflow/providers/teradata/operators/teradata.py index 00cd7a86c7..c15fc29038 100644 --- a/airflow/providers/teradata/operators/teradata.py +++ b/airflow/providers/teradata/operators/teradata.py @@ -31,7 +31,7 @@ class TeradataOperator(SQLExecuteQueryOperator): """ General Teradata Operator to execute queries on Teradata Database. - Executes sql statements in the Teradata SQL Database using teradatasql jdbc driver + Executes sql statements in the Teradata SQL Database using Teradata Python SQL Driver .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/teradata/transfers/azure_blob_to_teradata.py b/airflow/providers/teradata/transfers/azure_blob_to_teradata.py index 416b4e7136..8fc95122f1 100644 --- a/airflow/providers/teradata/transfers/azure_blob_to_teradata.py +++ b/airflow/providers/teradata/transfers/azure_blob_to_teradata.py @@ -48,10 +48,17 @@ class AzureBlobStorageToTeradataOperator(BaseOperator): The URI format is `/az/YOUR-STORAGE-ACCOUNT.blob.core.windows.net/YOUR-CONTAINER/YOUR-BLOB-LOCATION`. Refer to https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US + :param public_bucket: Specifies whether the provided blob container is public. If the blob container is public, + it means that anyone can access the objects within it via a URL without requiring authentication. + If the bucket is private and authentication is not provided, the operator will throw an exception. :param azure_conn_id: The Airflow WASB connection used for azure blob credentials. :param teradata_table: The name of the teradata table to which the data is transferred.(templated) :param teradata_conn_id: The connection ID used to connect to Teradata :ref:`Teradata connection <howto/connection:Teradata>` + :param teradata_authorization_name: The name of Teradata Authorization Database Object, + is used to control who can access an Azure Blob object store. + Refer to + https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Controlling-Foreign-Table-Access-with-an-AUTHORIZATION-Object Note that ``blob_source_key`` and ``teradata_table`` are templated, so you can use variables in them if you wish. @@ -64,37 +71,48 @@ class AzureBlobStorageToTeradataOperator(BaseOperator): self, *, blob_source_key: str, + public_bucket: bool = False, azure_conn_id: str = "azure_default", teradata_table: str, teradata_conn_id: str = "teradata_default", + teradata_authorization_name: str = "", **kwargs, ) -> None: super().__init__(**kwargs) self.blob_source_key = blob_source_key + self.public_bucket = public_bucket self.azure_conn_id = azure_conn_id self.teradata_table = teradata_table self.teradata_conn_id = teradata_conn_id + self.teradata_authorization_name = teradata_authorization_name def execute(self, context: Context) -> None: self.log.info( "transferring data from %s to teradata table %s...", self.blob_source_key, self.teradata_table ) - azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id) - conn = azure_hook.get_connection(self.azure_conn_id) - # Obtaining the Azure client ID and Azure secret in order to access a specified Blob container - access_id = conn.login if conn.login is not None else "" - access_secret = conn.password if conn.password is not None else "" teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id) + credentials_part = "ACCESS_ID= '' ACCESS_KEY= ''" + if not self.public_bucket: + # Accessing data directly from the Azure Blob Storage and creating permanent table inside the + # database + if self.teradata_authorization_name: + credentials_part = f"AUTHORIZATION={self.teradata_authorization_name}" + else: + # Obtaining the Azure client ID and Azure secret in order to access a specified Blob container + azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id) + conn = azure_hook.get_connection(self.azure_conn_id) + access_id = conn.login + access_secret = conn.password + credentials_part = f"ACCESS_ID= '{access_id}' ACCESS_KEY= '{access_secret}'" sql = dedent(f""" - CREATE MULTISET TABLE {self.teradata_table} AS - ( - SELECT * FROM ( - LOCATION = '{self.blob_source_key}' - ACCESS_ID= '{access_id}' - ACCESS_KEY= '{access_secret}' - ) AS d - ) WITH DATA - """).rstrip() + CREATE MULTISET TABLE {self.teradata_table} AS + ( + SELECT * FROM ( + LOCATION = '{self.blob_source_key}' + {credentials_part} + ) AS d + ) WITH DATA + """).rstrip() try: teradata_hook.run(sql, True) except Exception as ex: diff --git a/airflow/providers/teradata/transfers/s3_to_teradata.py b/airflow/providers/teradata/transfers/s3_to_teradata.py index f7998ea861..d0bca09165 100644 --- a/airflow/providers/teradata/transfers/s3_to_teradata.py +++ b/airflow/providers/teradata/transfers/s3_to_teradata.py @@ -53,6 +53,10 @@ class S3ToTeradataOperator(BaseOperator): :param aws_conn_id: The Airflow AWS connection used for AWS credentials. :param teradata_conn_id: The connection ID used to connect to Teradata :ref:`Teradata connection <howto/connection:Teradata>`. + :param teradata_authorization_name: The name of Teradata Authorization Database Object, + is used to control who can access an S3 object store. + Refer to + https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Controlling-Foreign-Table-Access-with-an-AUTHORIZATION-Object Note that ``s3_source_key`` and ``teradata_table`` are templated, so you can use variables in them if you wish. @@ -69,6 +73,7 @@ class S3ToTeradataOperator(BaseOperator): teradata_table: str, aws_conn_id: str = "aws_default", teradata_conn_id: str = "teradata_default", + teradata_authorization_name: str = "", **kwargs, ) -> None: super().__init__(**kwargs) @@ -77,6 +82,7 @@ class S3ToTeradataOperator(BaseOperator): self.teradata_table = teradata_table self.aws_conn_id = aws_conn_id self.teradata_conn_id = teradata_conn_id + self.teradata_authorization_name = teradata_authorization_name def execute(self, context: Context) -> None: self.log.info( @@ -84,20 +90,26 @@ class S3ToTeradataOperator(BaseOperator): ) s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) - access_key = "" - access_secret = "" - if not self.public_bucket: - credentials = s3_hook.get_credentials() - access_key = credentials.access_key - access_secret = credentials.secret_key teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id) + credentials_part = "ACCESS_ID= '' ACCESS_KEY= ''" + if not self.public_bucket: + # Accessing data directly from the S3 bucket and creating permanent table inside the database + if self.teradata_authorization_name: + credentials_part = f"AUTHORIZATION={self.teradata_authorization_name}" + else: + credentials = s3_hook.get_credentials() + access_key = credentials.access_key + access_secret = credentials.secret_key + credentials_part = f"ACCESS_ID= '{access_key}' ACCESS_KEY= '{access_secret}'" + token = credentials.token + if token: + credentials_part = credentials_part + f" SESSION_TOKEN = '{token}'" sql = dedent(f""" CREATE MULTISET TABLE {self.teradata_table} AS ( SELECT * FROM ( LOCATION = '{self.s3_source_key}' - ACCESS_ID= '{access_key}' - ACCESS_KEY= '{access_secret}' + {credentials_part} ) AS d ) WITH DATA """).rstrip() diff --git a/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst b/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst index 0ee9a7bb32..194eabd0cd 100644 --- a/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst +++ b/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst @@ -26,8 +26,69 @@ AzureBlobStorageToTeradataOperator The purpose of ``AzureBlobStorageToTeradataOperator`` is to define tasks involving CSV, JSON and Parquet format data transfer from an Azure Blob Storage to Teradata table. Use the :class:`AzureBlobStorageToTeradataOperator <airflow.providers.teradata.transfers.azure_blob_to_teradata>` -to transfer data from an Azure Blob Storage to Teradata. +to transfer data from an Azure Blob Storage to Teradata.This operator leverages the Teradata +`READ_NOS <https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Welcome-to-Native-Object-Store>`_ feature +to import data in CSV, JSON, and Parquet formats from Azure Blob Storage into Teradata. +This operator accesses data directly from the object store and generates permanent tables +within the database using READ_NOS and CREATE TABLE AS functionalities with below SQL statement. +.. code-block:: sql + + CREATE MULTISET TABLE multiset_table_name AS ( + SELECT * + FROM ( + LOCATION='YOUR-OBJECT-STORE-URI' + AUTHORIZATION=authorization_object + ) AS d + ) WITH DATA; + +It facilitates data loading from both public and private object storage. For private object storage, access to the object +store can be granted via either Teradata Authorization database object or Object Store Login and Object Store Key +defined with Azure Blob Storage connection in Airflow. Conversely, for data transfer from public object storage, +no authorization or access credentials are required. + +* Teradata Authorization database object access type can be used with ``teradata_authorization_name`` parameter of ``AzureBlobStorageToTeradataOperator`` +* Object Store Access Key ID and Access Key Secret access type can be used with ``azure_conn_id`` parameter of ``S3ToTeradataOperator`` + +https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Setting-Access-Privileges + +.. note:: + Teradata Authorization database object takes precedence if both access types defined. + +Transferring data from public Azure Blob Storage to Teradata +------------------------------------------------------------ + +An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from public Azure Blob Storage to teradata table is as follows: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py + :language: python + :start-after: [START azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv] + :end-before: [END azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv] + +Transferring data from private Azure Blob Storage to Teradata with AWS connection +--------------------------------------------------------------------------------- + +An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from private S3 object store to teradata with AWS credentials defined as +AWS connection: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py + :language: python + :start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_access_blob_to_teradata_csv] + :end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_access_blob_to_teradata_csv] + +Transferring data from private Azure Blob Storage to Teradata with Teradata Authorization Object +------------------------------------------------------------------------------------------------ +Teradata authorization database object is used to control who can access an external object store. Teradata authorization +database object should exists in Teradata database to use it in transferring data from S3 to Teradata. Refer +`Authentication for External Object Stores in Teradata <https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Authentication-for-External-Object-Stores>`_ + +An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from private S3 object store to teradata with +Authorization database object defined in Teradata. + +.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py + :language: python + :start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_blob_to_teradata_csv] + :end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_blob_to_teradata_csv] Transferring data in CSV format from Azure Blob Storage to Teradata ------------------------------------------------------------------- @@ -37,8 +98,8 @@ to teradata table is as follows: .. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py :language: python - :start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] - :end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] + :start-after: [START azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv] + :end-before: [END azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv] Transferring data in JSON format from Azure Blob Storage to Teradata -------------------------------------------------------------------- diff --git a/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst b/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst index a6ecbc6f14..da52e2841b 100644 --- a/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst +++ b/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst @@ -30,7 +30,11 @@ READ_NOS is a table operator in Teradata Vantage that allows users to list exter For more details, see `READ_NOS Functionality <https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Reading-Data/Examples-For-DBAs-and-Advanced-Users/Loading-External-Data-into-the-Database/Loading-External-Data-into-the-Database-Using-READ_NOS-and-CREATE-TABLE-AS>`_ Use the :class:`S3ToTeradataOperator <airflow.providers.teradata.transfers.s3_to_teradata>` -to transfer data from S3 to Teradata. +to transfer data from S3 to Teradata. This operator leverages the Teradata +`READ_NOS <https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Welcome-to-Native-Object-Store>`_ feature +to import data in CSV, JSON, and Parquet formats from S3 into Teradata. +This operator accesses data directly from the object store and generates permanent tables +within the database using READ_NOS and CREATE TABLE AS functionalities with below SQL statement. .. note:: The current version of ``S3ToTeradataOperator`` does not support accessing AWS S3 with Security Token Service (STS) temporary credentials. Instead, it exclusively supports accessing with long-term credentials. @@ -43,8 +47,8 @@ An example usage of the S3ToTeradataOperator to transfer CSV data format from S3 .. exampleinclude:: /../../tests/system/providers/teradata/example_s3_to_teradata_transfer.py :language: python - :start-after: [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] - :end-before: [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] + :start-after: [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv] + :end-before: [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv] Transferring data in JSON format from S3 to Teradata ---------------------------------------------------- diff --git a/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py index 5d961550de..bcb1dd2fe6 100644 --- a/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py +++ b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py @@ -53,15 +53,17 @@ with DAG( catchup=False, default_args={"teradata_conn_id": CONN_ID}, ) as dag: - # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] + # [START azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv] transfer_data_csv = AzureBlobStorageToTeradataOperator( task_id="transfer_data_blob_to_teradata_csv", blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/", + public_bucket=True, teradata_table="example_blob_teradata_csv", + teradata_conn_id="teradata_default", azure_conn_id="wasb_default", trigger_rule="all_done", ) - # [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] + # [END azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv] # [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] read_data_table_csv = TeradataOperator( task_id="read_data_table_csv", @@ -74,11 +76,75 @@ with DAG( sql="DROP TABLE example_blob_teradata_csv;", ) # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_access_blob_to_teradata_csv] + transfer_key_data_csv = AzureBlobStorageToTeradataOperator( + task_id="transfer_key_data_blob_to_teradata_csv", + blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/", + teradata_table="example_blob_teradata_csv", + azure_conn_id="wasb_default", + teradata_conn_id="teradata_default", + trigger_rule="all_done", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_access_blob_to_teradata_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + read_key_data_table_csv = TeradataOperator( + task_id="read_key_data_table_csv", + conn_id=CONN_ID, + sql="SELECT count(1) from example_blob_teradata_csv;", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] + drop_key_table_csv = TeradataOperator( + task_id="drop_key_table_csv", + conn_id=CONN_ID, + sql="DROP TABLE example_blob_teradata_csv;", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_create_authorization] + create_azure_authorization = TeradataOperator( + task_id="create_azure_authorization", + conn_id=CONN_ID, + sql="CREATE AUTHORIZATION azure_authorization USER '{{ var.value.get('AZURE_BLOB_ACCOUNTNAME') }}' PASSWORD '{{ var.value.get('AZURE_BLOB_ACCOUNT_SECRET_KEY') }}' ", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_create_authorization] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_blob_to_teradata_csv] + transfer_auth_data_csv = AzureBlobStorageToTeradataOperator( + task_id="transfer_auth_data_blob_to_teradata_csv", + blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/", + teradata_table="example_blob_teradata_csv", + teradata_authorization_name="azure_authorization", + teradata_conn_id="teradata_default", + trigger_rule="all_done", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_blob_to_teradata_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + read_auth_data_table_csv = TeradataOperator( + task_id="read_auth_data_table_csv", + conn_id=CONN_ID, + sql="SELECT count(1) from example_blob_teradata_csv;", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] + drop_auth_table_csv = TeradataOperator( + task_id="drop_auth_table_csv", + conn_id=CONN_ID, + sql="DROP TABLE example_blob_teradata_csv;", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_authorization] + drop_auth = TeradataOperator( + task_id="drop_auth", + conn_id=CONN_ID, + sql="DROP AUTHORIZATION azure_authorization;", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_authorization] # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json] transfer_data_json = AzureBlobStorageToTeradataOperator( task_id="transfer_data_blob_to_teradata_json", blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/", teradata_table="example_blob_teradata_json", + public_bucket=True, + teradata_conn_id="teradata_default", azure_conn_id="wasb_default", trigger_rule="all_done", ) @@ -100,7 +166,7 @@ with DAG( task_id="transfer_data_blob_to_teradata_parquet", blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/", teradata_table="example_blob_teradata_parquet", - azure_conn_id="wasb_default", + public_bucket=True, teradata_conn_id="teradata_default", trigger_rule="all_done", ) @@ -128,6 +194,14 @@ with DAG( >> drop_table_csv >> drop_table_json >> drop_table_parquet + >> transfer_key_data_csv + >> read_key_data_table_csv + >> drop_key_table_csv + >> create_azure_authorization + >> transfer_auth_data_csv + >> read_auth_data_table_csv + >> drop_auth_table_csv + >> drop_auth ) # [END azure_blob_to_teradata_transfer_operator_howto_guide] diff --git a/tests/system/providers/teradata/example_s3_to_teradata_transfer.py b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py index fc5e262739..ae8b827c1e 100644 --- a/tests/system/providers/teradata/example_s3_to_teradata_transfer.py +++ b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py @@ -54,7 +54,7 @@ with DAG( catchup=False, default_args={"teradata_conn_id": CONN_ID}, ) as dag: - # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv] transfer_data_csv = S3ToTeradataOperator( task_id="transfer_data_s3_to_teradata_csv", s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/CSVDATA/09394500/2018/06/", @@ -63,7 +63,7 @@ with DAG( aws_conn_id="aws_default", trigger_rule="all_done", ) - # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] + # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv] # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv] read_data_table_csv = TeradataOperator( task_id="read_data_table_csv", @@ -78,6 +78,68 @@ with DAG( sql="DROP TABLE example_s3_teradata_csv;", ) # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_access_s3_to_teradata_csv] + transfer_key_data_csv = S3ToTeradataOperator( + task_id="transfer_key_data_s3_to_teradata_key_csv", + s3_source_key="/s3/airflowteradatatest.s3.ap-southeast-2.amazonaws.com/", + teradata_table="example_s3_teradata_csv", + aws_conn_id="aws_default", + teradata_conn_id="teradata_default", + trigger_rule="all_done", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_access_s3_to_teradata_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + read_key_data_table_csv = TeradataOperator( + task_id="read_key_data_table_csv", + conn_id=CONN_ID, + sql="SELECT * from example_s3_teradata_csv;", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv] + drop_key_table_csv = TeradataOperator( + task_id="drop_key_table_csv", + conn_id=CONN_ID, + sql="DROP TABLE example_s3_teradata_csv;", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_create_authorization] + create_aws_authorization = TeradataOperator( + task_id="create_aws_authorization", + conn_id=CONN_ID, + sql="CREATE AUTHORIZATION aws_authorization USER '{{ var.value.get('AWS_ACCESS_KEY_ID') }}' PASSWORD '{{ var.value.get('AWS_SECRET_ACCESS_KEY') }}' ", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_create_authorization] + # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_s3_to_teradata_csv] + transfer_auth_data_csv = S3ToTeradataOperator( + task_id="transfer_auth_data_s3_to_teradata_auth_csv", + s3_source_key="/s3/teradata-download.s3.us-east-1.amazonaws.com/DevTools/csv/", + teradata_table="example_s3_teradata_csv", + teradata_authorization_name="aws_authorization", + teradata_conn_id="teradata_default", + trigger_rule="all_done", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_s3_to_teradata_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + read_auth_data_table_csv = TeradataOperator( + task_id="read_auth_data_table_csv", + conn_id=CONN_ID, + sql="SELECT * from example_s3_teradata_csv;", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv] + drop_auth_table_csv = TeradataOperator( + task_id="drop_auth_table_csv", + conn_id=CONN_ID, + sql="DROP TABLE example_s3_teradata_csv;", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_drop_authorization] + drop_auth = TeradataOperator( + task_id="drop_auth", + conn_id=CONN_ID, + sql="DROP AUTHORIZATION aws_authorization;", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_drop_authorization] # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json] transfer_data_json = S3ToTeradataOperator( task_id="transfer_data_s3_to_teradata_json", @@ -116,12 +178,13 @@ with DAG( sql="SELECT * from example_s3_teradata_parquet;", ) # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet] - # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet] + # [START s3_to_teradata_transfer_operator_howto_guide_drop_table] drop_table_parquet = TeradataOperator( task_id="drop_table_parquet", sql="DROP TABLE example_s3_teradata_parquet;", ) - # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet] + + # [END s3_to_teradata_transfer_operator_howto_guide_drop_table] ( transfer_data_csv >> transfer_data_json @@ -132,6 +195,14 @@ with DAG( >> drop_table_csv >> drop_table_json >> drop_table_parquet + >> transfer_key_data_csv + >> read_key_data_table_csv + >> drop_key_table_csv + >> create_aws_authorization + >> transfer_auth_data_csv + >> read_auth_data_table_csv + >> drop_auth_table_csv + >> drop_auth ) # [END s3_to_teradata_transfer_operator_howto_guide]