borismo opened a new pull request, #42218:
URL: https://github.com/apache/airflow/pull/42218

   Recently, AWS 
[added](https://aws.amazon.com/about-aws/whats-new/2024/09/session-reuse-amazon-redshift-data-api/)
 [session 
reuse](https://docs.aws.amazon.com/redshift/latest/mgmt/data-api.html#data-api-calling-considerations-session-reuse)
 to the Redshift Data API. It allows to, say, create a temporary table in one 
statement, and select from it in a subsequent one.
   I think it would be useful that `RedshiftDataOperator` supports this new 
feature.
   
   # Decisions
   - the hook and operator's `database` arguments now optional because when the 
session ID is provided, boto3 doesn't allow database to be specified. Because 
this becomes a named argument, I had to move it after `sql`. If users of the 
operator and hook rely on the position to pass the argument that could break 
things ⚠️ . Note that [boto3 only requires 
Sql](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-data/client/execute_statement.html).
   - the hook now returns an object containing the statement ID and session ID 
instead of just a statement ID. I choose a `dataclass` instead of a dict to 
make it harder to make a mistake when indexing the keys. If users use the hook 
in their tasks or custom operators, this could also break things ⚠️ 
   - fixed the typo in the `parse_statement_resposne` method. It's a public 
one. So, again, there is a risk of breaking change. ⚠️ 
   - boto3's validation error messages are not very useful when the session ID 
is a non-UUID string, or if no database, workgroup or session ID was provided, 
so I added checks in the hook.
   
   # To do
   - [ ] update docs
   - [ ] update changelog
   - [ ] test transfer operators in data API mode in a local DAG
   - [x] test creating a temporary table and selecting from it in two 
`RedshiftDataOperator` tasks:
   ```python
   from airflow.decorators import dag
   from airflow.providers.amazon.aws.operators.redshift_data import 
RedshiftDataOperator
   
   @dag(
       "Foo",
   )
   def _():
   
       RedshiftDataOperator(
           task_id="create_temp_table",
           aws_conn_id="redshift_data",
           cluster_identifier="my_data",
           db_user="airflow",
           database="my_db",
           sql="""CREATE TEMPORARY TABLE tmp_foo AS
           SELECT 'Alice' AS first_name, TRUE AS is_online;""",
           deferrable=True,
           wait_for_completion=True,
           session_keep_alive_seconds=600,
       ) >> RedshiftDataOperator(
           task_id="select_temp_table",
           aws_conn_id="redshift_data",
           sql="""SELECT * FROM tmp_foo;""",
           deferrable=True,
           wait_for_completion=True,
           session_id="{{ task_instance.xcom_pull(task_ids='create_temp_table', 
key='session_id') }}",
           return_sql_result=True,
       )
   _()
   ```
   
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to