This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d7cfdb  Remove extra postgres dependency from AWS Provider (#18844)
1d7cfdb is described below

commit 1d7cfdbcd91705b2f88ef4ece503b7a072767e02
Author: Mario Taddeucci <mariotaddeu...@gmx.com>
AuthorDate: Sun Oct 10 17:59:18 2021 -0300

    Remove extra postgres dependency from AWS Provider (#18844)
    
    * Remove extra prostgres dependency
    
    * Removed postgres cross dependency on aws provider
---
 CONTRIBUTING.rst                                   |  2 +-
 .../aws/example_dags/example_s3_to_redshift.py     |  8 +++-----
 airflow/providers/amazon/aws/hooks/redshift.py     | 24 ++++++++++++++++++++++
 .../amazon/aws/transfers/redshift_to_s3.py         |  6 +++---
 .../amazon/aws/transfers/s3_to_redshift.py         |  8 ++++----
 airflow/providers/amazon/provider.yaml             |  3 ---
 airflow/providers/dependencies.json                |  1 -
 .../amazon/aws/transfers/test_redshift_to_s3.py    |  8 ++++----
 .../amazon/aws/transfers/test_s3_to_redshift.py    | 14 ++++++-------
 9 files changed, 46 insertions(+), 28 deletions(-)

diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index d598742..14be13c 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -659,7 +659,7 @@ Here is the list of packages and their extras:
 Package                    Extras
 ========================== ===========================
 airbyte                    http
-amazon                     
apache.hive,cncf.kubernetes,exasol,ftp,google,imap,mongo,mysql,postgres,salesforce,ssh
+amazon                     
apache.hive,cncf.kubernetes,exasol,ftp,google,imap,mongo,mysql,salesforce,ssh
 apache.beam                google
 apache.druid               apache.hive
 apache.hive                amazon,microsoft.mssql,mysql,presto,samba,vertica
diff --git 
a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py 
b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
index 9cec527..f095498 100644
--- a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
+++ b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
@@ -25,8 +25,8 @@ from airflow import DAG
 from airflow.decorators import task
 from airflow.models.baseoperator import chain
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator
 from airflow.providers.amazon.aws.transfers.s3_to_redshift import 
S3ToRedshiftOperator
-from airflow.providers.postgres.operators.postgres import PostgresOperator
 from airflow.utils.dates import days_ago
 
 # [START howto_operator_s3_to_redshift_env_variables]
@@ -54,9 +54,8 @@ with DAG(
 ) as dag:
     add_sample_data_to_s3 = add_sample_data_to_s3()
 
-    setup__task_create_table = PostgresOperator(
+    setup__task_create_table = RedshiftSQLOperator(
         sql=f'CREATE TABLE IF NOT EXISTS {REDSHIFT_TABLE}(Id int, Name 
varchar)',
-        postgres_conn_id='redshift_default',
         task_id='setup__create_table',
     )
     # [START howto_operator_s3_to_redshift_task_1]
@@ -69,9 +68,8 @@ with DAG(
         task_id='transfer_s3_to_redshift',
     )
     # [END howto_operator_s3_to_redshift_task_1]
-    teardown__task_drop_table = PostgresOperator(
+    teardown__task_drop_table = RedshiftSQLOperator(
         sql=f'DROP TABLE IF EXISTS {REDSHIFT_TABLE}',
-        postgres_conn_id='redshift_default',
         task_id='teardown__drop_table',
     )
 
diff --git a/airflow/providers/amazon/aws/hooks/redshift.py 
b/airflow/providers/amazon/aws/hooks/redshift.py
index 63c4a04..e9fefc0 100644
--- a/airflow/providers/amazon/aws/hooks/redshift.py
+++ b/airflow/providers/amazon/aws/hooks/redshift.py
@@ -212,6 +212,30 @@ class RedshiftSQLHook(DbApiHook):
 
         return create_engine(self.get_uri(), **engine_kwargs)
 
+    def get_table_primary_key(self, table: str, schema: Optional[str] = 
"public") -> List[str]:
+        """
+        Helper method that returns the table primary key
+        :param table: Name of the target table
+        :type table: str
+        :param table: Name of the target schema, public by default
+        :type table: str
+        :return: Primary key columns list
+        :rtype: List[str]
+        """
+        sql = """
+            select kcu.column_name
+            from information_schema.table_constraints tco
+                    join information_schema.key_column_usage kcu
+                        on kcu.constraint_name = tco.constraint_name
+                            and kcu.constraint_schema = tco.constraint_schema
+                            and kcu.constraint_name = tco.constraint_name
+            where tco.constraint_type = 'PRIMARY KEY'
+            and kcu.table_schema = %s
+            and kcu.table_name = %s
+        """
+        pk_columns = [row[0] for row in self.get_records(sql, (schema, table))]
+        return pk_columns or None
+
     def get_conn(self) -> RedshiftConnection:
         """Returns a redshift_connector.Connection object"""
         conn_params = self._get_conn_params()
diff --git a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py 
b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
index a3d49c6..42ce2b8 100644
--- a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
@@ -19,9 +19,9 @@
 from typing import Iterable, List, Mapping, Optional, Union
 
 from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftSQLHook
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.utils.redshift import build_credentials_block
-from airflow.providers.postgres.hooks.postgres import PostgresHook
 
 
 class RedshiftToS3Operator(BaseOperator):
@@ -136,7 +136,7 @@ class RedshiftToS3Operator(BaseOperator):
         """
 
     def execute(self, context) -> None:
-        postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
+        redshift_hook = RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id)
         conn = S3Hook.get_connection(conn_id=self.aws_conn_id)
 
         credentials_block = None
@@ -154,5 +154,5 @@ class RedshiftToS3Operator(BaseOperator):
         )
 
         self.log.info('Executing UNLOAD command...')
-        postgres_hook.run(unload_query, self.autocommit, 
parameters=self.parameters)
+        redshift_hook.run(unload_query, self.autocommit, 
parameters=self.parameters)
         self.log.info("UNLOAD command complete...")
diff --git a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py 
b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
index 5149406..c0a1a7c 100644
--- a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
+++ b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
@@ -20,9 +20,9 @@ from typing import List, Optional, Union
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftSQLHook
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.utils.redshift import build_credentials_block
-from airflow.providers.postgres.hooks.postgres import PostgresHook
 
 AVAILABLE_METHODS = ['APPEND', 'REPLACE', 'UPSERT']
 
@@ -131,7 +131,7 @@ class S3ToRedshiftOperator(BaseOperator):
         """
 
     def execute(self, context) -> None:
-        postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
+        redshift_hook = RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id)
         conn = S3Hook.get_connection(conn_id=self.aws_conn_id)
 
         credentials_block = None
@@ -156,7 +156,7 @@ class S3ToRedshiftOperator(BaseOperator):
             COMMIT
             """
         elif self.method == 'UPSERT':
-            keys = self.upsert_keys or 
postgres_hook.get_table_primary_key(self.table, self.schema)
+            keys = self.upsert_keys or 
redshift_hook.get_table_primary_key(self.table, self.schema)
             if not keys:
                 raise AirflowException(
                     f"No primary key on {self.schema}.{self.table}. Please 
provide keys on 'upsert_keys'"
@@ -174,5 +174,5 @@ class S3ToRedshiftOperator(BaseOperator):
             sql = copy_statement
 
         self.log.info('Executing COPY command...')
-        postgres_hook.run(sql, self.autocommit)
+        redshift_hook.run(sql, autocommit=self.autocommit)
         self.log.info("COPY command complete...")
diff --git a/airflow/providers/amazon/provider.yaml 
b/airflow/providers/amazon/provider.yaml
index 8715637..fbf4c1b 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -35,9 +35,6 @@ versions:
 additional-dependencies:
   - apache-airflow>=2.1.0
 
-additional-extras:
-  postgres: apache-airflow-providers-postgres>=2.3.0
-
 integrations:
   - integration-name: Amazon Athena
     external-doc-url: https://aws.amazon.com/athena/
diff --git a/airflow/providers/dependencies.json 
b/airflow/providers/dependencies.json
index 229e283..6779fec 100644
--- a/airflow/providers/dependencies.json
+++ b/airflow/providers/dependencies.json
@@ -11,7 +11,6 @@
     "imap",
     "mongo",
     "mysql",
-    "postgres",
     "salesforce",
     "ssh"
   ],
diff --git a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py 
b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
index 880cc11..da7acf2 100644
--- a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
@@ -39,7 +39,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
     @mock.patch("airflow.models.connection.Connection")
     @mock.patch("boto3.session.Session")
-    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
     def test_table_unloading(
         self,
         table_as_file_name,
@@ -103,7 +103,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
     @mock.patch("airflow.models.connection.Connection")
     @mock.patch("boto3.session.Session")
-    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
     def test_execute_sts_token(
         self,
         table_as_file_name,
@@ -171,7 +171,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
     @mock.patch("airflow.models.connection.Connection")
     @mock.patch("boto3.session.Session")
-    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
     def test_custom_select_query_unloading(
         self,
         table,
@@ -234,7 +234,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
     @mock.patch("airflow.models.connection.Connection")
     @mock.patch("boto3.session.Session")
-    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
     def test_table_unloading_role_arn(
         self,
         table_as_file_name,
diff --git a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py 
b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
index 0cf02b6..ff03165 100644
--- a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
+++ b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
@@ -33,7 +33,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
     @mock.patch("airflow.models.connection.Connection")
     @mock.patch("boto3.session.Session")
-    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
     def test_execute(self, mock_run, mock_session, mock_connection, mock_hook):
         access_key = "aws_access_key_id"
         secret_key = "aws_secret_access_key"
@@ -78,7 +78,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
     @mock.patch("airflow.models.connection.Connection")
     @mock.patch("boto3.session.Session")
-    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
     def test_execute_with_column_list(self, mock_run, mock_session, 
mock_connection, mock_hook):
         access_key = "aws_access_key_id"
         secret_key = "aws_secret_access_key"
@@ -125,7 +125,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
     @mock.patch("airflow.models.connection.Connection")
     @mock.patch("boto3.session.Session")
-    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
     def test_deprecated_truncate(self, mock_run, mock_session, 
mock_connection, mock_hook):
         access_key = "aws_access_key_id"
         secret_key = "aws_secret_access_key"
@@ -177,7 +177,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
     @mock.patch("airflow.models.connection.Connection")
     @mock.patch("boto3.session.Session")
-    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
     def test_replace(self, mock_run, mock_session, mock_connection, mock_hook):
         access_key = "aws_access_key_id"
         secret_key = "aws_secret_access_key"
@@ -229,7 +229,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
     @mock.patch("airflow.models.connection.Connection")
     @mock.patch("boto3.session.Session")
-    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
     def test_upsert(self, mock_run, mock_session, mock_connection, mock_hook):
         access_key = "aws_access_key_id"
         secret_key = "aws_secret_access_key"
@@ -284,7 +284,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
     @mock.patch("airflow.models.connection.Connection")
     @mock.patch("boto3.session.Session")
-    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
     def test_execute_sts_token(self, mock_run, mock_session, mock_connection, 
mock_hook):
         access_key = "ASIA_aws_access_key_id"
         secret_key = "aws_secret_access_key"
@@ -331,7 +331,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
     @mock.patch("airflow.models.connection.Connection")
     @mock.patch("boto3.session.Session")
-    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
     def test_execute_role_arn(self, mock_run, mock_session, mock_connection, 
mock_hook):
         access_key = "ASIA_aws_access_key_id"
         secret_key = "aws_secret_access_key"

Reply via email to