[GitHub] [airflow] milton0825 commented on a change in pull request #6309: [AIRFLOW-3783][AIRFLOW-5395] Switch to HEADER arg to unload data from Redshift to S3
milton0825 commented on a change in pull request #6309: [AIRFLOW-3783][AIRFLOW-5395] Switch to HEADER arg to unload data from Redshift to S3 URL: https://github.com/apache/airflow/pull/6309#discussion_r334244845 ## File path: airflow/operators/redshift_to_s3_operator.py ## @@ -85,52 +93,16 @@ def __init__( self.autocommit = autocommit self.include_header = include_header -if self.include_header and 'PARALLEL OFF' not in [uo.upper().strip() for uo in self.unload_options]: -self.unload_options = list(self.unload_options) + ['PARALLEL OFF', ] +if self.include_header and 'HEADER' not in [uo.upper().strip() for uo in self.unload_options]: +self.unload_options = list(self.unload_options) + ['HEADER', ] def execute(self, context): -self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id) -self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) -credentials = self.s3.get_credentials() -unload_options = '\n\t\t\t'.join(self.unload_options) - -if self.include_header: -self.log.info("Retrieving headers from %s.%s...", - self.schema, self.table) - -columns_query = """SELECT column_name -FROM information_schema.columns -WHERE table_schema = '{schema}' -AND table_name = '{table}' -ORDER BY ordinal_position -""".format(schema=self.schema, - table=self.table) - -cursor = self.hook.get_conn().cursor() -cursor.execute(columns_query) -rows = cursor.fetchall() -columns = [row[0] for row in rows] -column_names = ', '.join("{0}".format(c) for c in columns) -column_headers = ', '.join("\\'{0}\\'".format(c) for c in columns) -column_castings = ', '.join("CAST({0} AS text) AS {0}".format(c) -for c in columns) - -select_query = """SELECT {column_names} FROM -(SELECT 2 sort_order, {column_castings} - FROM {schema}.{table} -UNION ALL -SELECT 1 sort_order, {column_headers}) - ORDER BY sort_order"""\ -.format(column_names=column_names, -column_castings=column_castings, -column_headers=column_headers, -schema=self.schema, -table=self.table) -else: -select_query = "SELECT * FROM {schema}.{table}"\ -.format(schema=self.schema, -table=self.table) +postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id) +s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) +credentials = s3_hook.get_credentials() +unload_options = '\n\t\t\t'.join(self.unload_options) +select_query = "SELECT * FROM {schema}.{table}".format(schema=self.schema, table=self.table) Review comment: Yup I think allowing to specify `select_query` should work. We either have to deprecate the `schema` and `table` field, or we need to have some validation to check that these fields cannot be used at the same time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] milton0825 commented on a change in pull request #6309: [AIRFLOW-3783][AIRFLOW-5395] Switch to HEADER arg to unload data from Redshift to S3
milton0825 commented on a change in pull request #6309: [AIRFLOW-3783][AIRFLOW-5395] Switch to HEADER arg to unload data from Redshift to S3 URL: https://github.com/apache/airflow/pull/6309#discussion_r334209505 ## File path: airflow/operators/redshift_to_s3_operator.py ## @@ -85,52 +93,16 @@ def __init__( self.autocommit = autocommit self.include_header = include_header -if self.include_header and 'PARALLEL OFF' not in [uo.upper().strip() for uo in self.unload_options]: -self.unload_options = list(self.unload_options) + ['PARALLEL OFF', ] +if self.include_header and 'HEADER' not in [uo.upper().strip() for uo in self.unload_options]: +self.unload_options = list(self.unload_options) + ['HEADER', ] def execute(self, context): -self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id) -self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) -credentials = self.s3.get_credentials() -unload_options = '\n\t\t\t'.join(self.unload_options) - -if self.include_header: -self.log.info("Retrieving headers from %s.%s...", - self.schema, self.table) - -columns_query = """SELECT column_name -FROM information_schema.columns -WHERE table_schema = '{schema}' -AND table_name = '{table}' -ORDER BY ordinal_position -""".format(schema=self.schema, - table=self.table) - -cursor = self.hook.get_conn().cursor() -cursor.execute(columns_query) -rows = cursor.fetchall() -columns = [row[0] for row in rows] -column_names = ', '.join("{0}".format(c) for c in columns) -column_headers = ', '.join("\\'{0}\\'".format(c) for c in columns) -column_castings = ', '.join("CAST({0} AS text) AS {0}".format(c) -for c in columns) - -select_query = """SELECT {column_names} FROM -(SELECT 2 sort_order, {column_castings} - FROM {schema}.{table} -UNION ALL -SELECT 1 sort_order, {column_headers}) - ORDER BY sort_order"""\ -.format(column_names=column_names, -column_castings=column_castings, -column_headers=column_headers, -schema=self.schema, -table=self.table) -else: -select_query = "SELECT * FROM {schema}.{table}"\ -.format(schema=self.schema, -table=self.table) +postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id) +s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) +credentials = s3_hook.get_credentials() +unload_options = '\n\t\t\t'.join(self.unload_options) +select_query = "SELECT * FROM {schema}.{table}".format(schema=self.schema, table=self.table) Review comment: Should we allow user to choose which columns to select? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services