borismo opened a new pull request, #42218: URL: https://github.com/apache/airflow/pull/42218
Recently, AWS [added](https://aws.amazon.com/about-aws/whats-new/2024/09/session-reuse-amazon-redshift-data-api/) [session reuse](https://docs.aws.amazon.com/redshift/latest/mgmt/data-api.html#data-api-calling-considerations-session-reuse) to the Redshift Data API. It allows to, say, create a temporary table in one statement, and select from it in a subsequent one. I think it would be useful that `RedshiftDataOperator` supports this new feature. # Decisions - the hook and operator's `database` arguments now optional because when the session ID is provided, boto3 doesn't allow database to be specified. Because this becomes a named argument, I had to move it after `sql`. If users of the operator and hook rely on the position to pass the argument that could break things ⚠️ . Note that [boto3 only requires Sql](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-data/client/execute_statement.html). - the hook now returns an object containing the statement ID and session ID instead of just a statement ID. I choose a `dataclass` instead of a dict to make it harder to make a mistake when indexing the keys. If users use the hook in their tasks or custom operators, this could also break things ⚠️ - fixed the typo in the `parse_statement_resposne` method. It's a public one. So, again, there is a risk of breaking change. ⚠️ - boto3's validation error messages are not very useful when the session ID is a non-UUID string, or if no database, workgroup or session ID was provided, so I added checks in the hook. # To do - [ ] update docs - [ ] update changelog - [ ] test transfer operators in data API mode in a local DAG - [x] test creating a temporary table and selecting from it in two `RedshiftDataOperator` tasks: ```python from airflow.decorators import dag from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator @dag( "Foo", ) def _(): RedshiftDataOperator( task_id="create_temp_table", aws_conn_id="redshift_data", cluster_identifier="my_data", db_user="airflow", database="my_db", sql="""CREATE TEMPORARY TABLE tmp_foo AS SELECT 'Alice' AS first_name, TRUE AS is_online;""", deferrable=True, wait_for_completion=True, session_keep_alive_seconds=600, ) >> RedshiftDataOperator( task_id="select_temp_table", aws_conn_id="redshift_data", sql="""SELECT * FROM tmp_foo;""", deferrable=True, wait_for_completion=True, session_id="{{ task_instance.xcom_pull(task_ids='create_temp_table', key='session_id') }}", return_sql_result=True, ) _() ``` --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information. In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org