This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 1d7cfdb Remove extra postgres dependency from AWS Provider (#18844) 1d7cfdb is described below commit 1d7cfdbcd91705b2f88ef4ece503b7a072767e02 Author: Mario Taddeucci <mariotaddeu...@gmx.com> AuthorDate: Sun Oct 10 17:59:18 2021 -0300 Remove extra postgres dependency from AWS Provider (#18844) * Remove extra prostgres dependency * Removed postgres cross dependency on aws provider --- CONTRIBUTING.rst | 2 +- .../aws/example_dags/example_s3_to_redshift.py | 8 +++----- airflow/providers/amazon/aws/hooks/redshift.py | 24 ++++++++++++++++++++++ .../amazon/aws/transfers/redshift_to_s3.py | 6 +++--- .../amazon/aws/transfers/s3_to_redshift.py | 8 ++++---- airflow/providers/amazon/provider.yaml | 3 --- airflow/providers/dependencies.json | 1 - .../amazon/aws/transfers/test_redshift_to_s3.py | 8 ++++---- .../amazon/aws/transfers/test_s3_to_redshift.py | 14 ++++++------- 9 files changed, 46 insertions(+), 28 deletions(-) diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index d598742..14be13c 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -659,7 +659,7 @@ Here is the list of packages and their extras: Package Extras ========================== =========================== airbyte http -amazon apache.hive,cncf.kubernetes,exasol,ftp,google,imap,mongo,mysql,postgres,salesforce,ssh +amazon apache.hive,cncf.kubernetes,exasol,ftp,google,imap,mongo,mysql,salesforce,ssh apache.beam google apache.druid apache.hive apache.hive amazon,microsoft.mssql,mysql,presto,samba,vertica diff --git a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py index 9cec527..f095498 100644 --- a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py +++ b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py @@ -25,8 +25,8 @@ from airflow import DAG from airflow.decorators import task from airflow.models.baseoperator import chain from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator -from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.utils.dates import days_ago # [START howto_operator_s3_to_redshift_env_variables] @@ -54,9 +54,8 @@ with DAG( ) as dag: add_sample_data_to_s3 = add_sample_data_to_s3() - setup__task_create_table = PostgresOperator( + setup__task_create_table = RedshiftSQLOperator( sql=f'CREATE TABLE IF NOT EXISTS {REDSHIFT_TABLE}(Id int, Name varchar)', - postgres_conn_id='redshift_default', task_id='setup__create_table', ) # [START howto_operator_s3_to_redshift_task_1] @@ -69,9 +68,8 @@ with DAG( task_id='transfer_s3_to_redshift', ) # [END howto_operator_s3_to_redshift_task_1] - teardown__task_drop_table = PostgresOperator( + teardown__task_drop_table = RedshiftSQLOperator( sql=f'DROP TABLE IF EXISTS {REDSHIFT_TABLE}', - postgres_conn_id='redshift_default', task_id='teardown__drop_table', ) diff --git a/airflow/providers/amazon/aws/hooks/redshift.py b/airflow/providers/amazon/aws/hooks/redshift.py index 63c4a04..e9fefc0 100644 --- a/airflow/providers/amazon/aws/hooks/redshift.py +++ b/airflow/providers/amazon/aws/hooks/redshift.py @@ -212,6 +212,30 @@ class RedshiftSQLHook(DbApiHook): return create_engine(self.get_uri(), **engine_kwargs) + def get_table_primary_key(self, table: str, schema: Optional[str] = "public") -> List[str]: + """ + Helper method that returns the table primary key + :param table: Name of the target table + :type table: str + :param table: Name of the target schema, public by default + :type table: str + :return: Primary key columns list + :rtype: List[str] + """ + sql = """ + select kcu.column_name + from information_schema.table_constraints tco + join information_schema.key_column_usage kcu + on kcu.constraint_name = tco.constraint_name + and kcu.constraint_schema = tco.constraint_schema + and kcu.constraint_name = tco.constraint_name + where tco.constraint_type = 'PRIMARY KEY' + and kcu.table_schema = %s + and kcu.table_name = %s + """ + pk_columns = [row[0] for row in self.get_records(sql, (schema, table))] + return pk_columns or None + def get_conn(self) -> RedshiftConnection: """Returns a redshift_connector.Connection object""" conn_params = self._get_conn_params() diff --git a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py index a3d49c6..42ce2b8 100644 --- a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py @@ -19,9 +19,9 @@ from typing import Iterable, List, Mapping, Optional, Union from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.redshift import RedshiftSQLHook from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.utils.redshift import build_credentials_block -from airflow.providers.postgres.hooks.postgres import PostgresHook class RedshiftToS3Operator(BaseOperator): @@ -136,7 +136,7 @@ class RedshiftToS3Operator(BaseOperator): """ def execute(self, context) -> None: - postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id) + redshift_hook = RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id) conn = S3Hook.get_connection(conn_id=self.aws_conn_id) credentials_block = None @@ -154,5 +154,5 @@ class RedshiftToS3Operator(BaseOperator): ) self.log.info('Executing UNLOAD command...') - postgres_hook.run(unload_query, self.autocommit, parameters=self.parameters) + redshift_hook.run(unload_query, self.autocommit, parameters=self.parameters) self.log.info("UNLOAD command complete...") diff --git a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py index 5149406..c0a1a7c 100644 --- a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py +++ b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py @@ -20,9 +20,9 @@ from typing import List, Optional, Union from airflow.exceptions import AirflowException from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.redshift import RedshiftSQLHook from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.utils.redshift import build_credentials_block -from airflow.providers.postgres.hooks.postgres import PostgresHook AVAILABLE_METHODS = ['APPEND', 'REPLACE', 'UPSERT'] @@ -131,7 +131,7 @@ class S3ToRedshiftOperator(BaseOperator): """ def execute(self, context) -> None: - postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id) + redshift_hook = RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id) conn = S3Hook.get_connection(conn_id=self.aws_conn_id) credentials_block = None @@ -156,7 +156,7 @@ class S3ToRedshiftOperator(BaseOperator): COMMIT """ elif self.method == 'UPSERT': - keys = self.upsert_keys or postgres_hook.get_table_primary_key(self.table, self.schema) + keys = self.upsert_keys or redshift_hook.get_table_primary_key(self.table, self.schema) if not keys: raise AirflowException( f"No primary key on {self.schema}.{self.table}. Please provide keys on 'upsert_keys'" @@ -174,5 +174,5 @@ class S3ToRedshiftOperator(BaseOperator): sql = copy_statement self.log.info('Executing COPY command...') - postgres_hook.run(sql, self.autocommit) + redshift_hook.run(sql, autocommit=self.autocommit) self.log.info("COPY command complete...") diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 8715637..fbf4c1b 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -35,9 +35,6 @@ versions: additional-dependencies: - apache-airflow>=2.1.0 -additional-extras: - postgres: apache-airflow-providers-postgres>=2.3.0 - integrations: - integration-name: Amazon Athena external-doc-url: https://aws.amazon.com/athena/ diff --git a/airflow/providers/dependencies.json b/airflow/providers/dependencies.json index 229e283..6779fec 100644 --- a/airflow/providers/dependencies.json +++ b/airflow/providers/dependencies.json @@ -11,7 +11,6 @@ "imap", "mongo", "mysql", - "postgres", "salesforce", "ssh" ], diff --git a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py index 880cc11..da7acf2 100644 --- a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py +++ b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py @@ -39,7 +39,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase): @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection") @mock.patch("airflow.models.connection.Connection") @mock.patch("boto3.session.Session") - @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run") def test_table_unloading( self, table_as_file_name, @@ -103,7 +103,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase): @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection") @mock.patch("airflow.models.connection.Connection") @mock.patch("boto3.session.Session") - @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run") def test_execute_sts_token( self, table_as_file_name, @@ -171,7 +171,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase): @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection") @mock.patch("airflow.models.connection.Connection") @mock.patch("boto3.session.Session") - @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run") def test_custom_select_query_unloading( self, table, @@ -234,7 +234,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase): @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection") @mock.patch("airflow.models.connection.Connection") @mock.patch("boto3.session.Session") - @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run") def test_table_unloading_role_arn( self, table_as_file_name, diff --git a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py index 0cf02b6..ff03165 100644 --- a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py +++ b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py @@ -33,7 +33,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase): @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection") @mock.patch("airflow.models.connection.Connection") @mock.patch("boto3.session.Session") - @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run") def test_execute(self, mock_run, mock_session, mock_connection, mock_hook): access_key = "aws_access_key_id" secret_key = "aws_secret_access_key" @@ -78,7 +78,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase): @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection") @mock.patch("airflow.models.connection.Connection") @mock.patch("boto3.session.Session") - @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run") def test_execute_with_column_list(self, mock_run, mock_session, mock_connection, mock_hook): access_key = "aws_access_key_id" secret_key = "aws_secret_access_key" @@ -125,7 +125,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase): @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection") @mock.patch("airflow.models.connection.Connection") @mock.patch("boto3.session.Session") - @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run") def test_deprecated_truncate(self, mock_run, mock_session, mock_connection, mock_hook): access_key = "aws_access_key_id" secret_key = "aws_secret_access_key" @@ -177,7 +177,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase): @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection") @mock.patch("airflow.models.connection.Connection") @mock.patch("boto3.session.Session") - @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run") def test_replace(self, mock_run, mock_session, mock_connection, mock_hook): access_key = "aws_access_key_id" secret_key = "aws_secret_access_key" @@ -229,7 +229,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase): @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection") @mock.patch("airflow.models.connection.Connection") @mock.patch("boto3.session.Session") - @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run") def test_upsert(self, mock_run, mock_session, mock_connection, mock_hook): access_key = "aws_access_key_id" secret_key = "aws_secret_access_key" @@ -284,7 +284,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase): @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection") @mock.patch("airflow.models.connection.Connection") @mock.patch("boto3.session.Session") - @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run") def test_execute_sts_token(self, mock_run, mock_session, mock_connection, mock_hook): access_key = "ASIA_aws_access_key_id" secret_key = "aws_secret_access_key" @@ -331,7 +331,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase): @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection") @mock.patch("airflow.models.connection.Connection") @mock.patch("boto3.session.Session") - @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run") def test_execute_role_arn(self, mock_run, mock_session, mock_connection, mock_hook): access_key = "ASIA_aws_access_key_id" secret_key = "aws_secret_access_key"