amoghrajesh opened a new pull request, #45924: URL: https://github.com/apache/airflow/pull/45924
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <!-- Thank you for contributing! Please make sure that your code changes are covered with tests. And in case of new features or big changes remember to adjust the documentation. Feel free to ping committers for the review! In case of an existing issue, reference it using one of the following: closes: #ISSUE related: #ISSUE How to write a good git commit message: http://chris.beams.io/posts/git-commit/ --> closes: #45752 ### Why? When a task completed (moves into success) state, asset related events for that particular task should be stores into the metadata DB. Currently it is done like this: [airflow/airflow/models/taskinstance.py](https://github.com/apache/airflow/blob/051e617e0d7d0ebb995cb98063709350f279963c/airflow/models/taskinstance.py#L359). This does a few things: 1. It extracts outlets from the task 2. For every outlet and its asset events, it creates record in the database in various tables: `asset` `asset_alias` `asset_alias_asset` `asset_alias_asset_event` `asset_event` 3. The support is not limited to Asset type only, it also works with AssetAliases and refs for Assets like name ref and uri ref. This behaviour needs to be ported into the task sdk. ### Approach The idea is to implement this logic in the `patch` ti state endpoint in the execution API server. The reasoning is so that we needn't make an additional API call but when "finishing" a task from the task runner, we can send in the relevant details like the task outlets, the outlet events and take care of the rest in `ti_update_state` endpoint. #### Interface changes - We have a new payload: `TISuccessStatePayload` to mark a state as success from the task runner in the execution API -> reason being, we do not want to coagulate the `TITerminalStatePayload` with additional information slowing down the API request for no need. - The structure contains: `task_outlets`: translates to `ti.task.outlets` at execution time sent from the task runner `outlet_events`: these are the events for a `outlet` object. For example, for `Assets` it translates to `context["outlet_events"][Asset Object]` `asset_type`: we send the type of object (class name) the exeuction API has to deal with. This is to avoid any additional mental gymnastics on the server side to find the kind of object we are dealing with as it will be serialised. #### Server Side - In the `ti_update_state` added an additional branch for success state where we call `register_asset_changes` which is a similar function to https://github.com/apache/airflow/blob/051e617e0d7d0ebb995cb98063709350f279963c/airflow/models/taskinstance.py#L359 but adjusted for the execution API. - This function does a few things: for every `task_outlet`, it registers the events for different types: 1. For Asset, it receives events from the task runner, so it just registers those. 2. For AssetNameRef and AssetUriRef, it find the relevant Asset for those and registers the events. 3. For AssetAlias, it creates a map of number of events to be registered on the basis of unique pairs of `tuple[asset uri, extra]`, and generates events for those by handling some cases. Docs: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html#how-to-use-datasetalias #### Execution side ##### Task Runner The task runner runs the task as usual and before finishing, checks for task outlets defined. If task outlets are defined, it populates the `task_outlets`, `outlet_events` and `asset_type` for the outlets present. For `Asset`: 1. Populates the `task_outlets` and `outlet_events` as `events[obj]` where `events = context["outlet_events"]` For `AssetNameRef` and `AssetUriRef`: 1. Populates the `task_outlets` as needed and populates all the events possible as we cannot access the DB to get the model being referenced For `AssetAlias`: We dont care about the `task_outlets`, we only care about the `asset_alias_events`, so those are populated in `outlet_events` Once this is done, a `SucceedTask` is sent to supervisor. ##### Supervisor Supervisor starts treating the success state as `STATES_SENT_DIRECTLY` from now. Once it receives a `SuceedTask` message from the task runner, it calls ``` self.client.task_instances.succeed( id=self.id, when=msg.end_date, task_outlets=msg.task_outlets, outlet_events=msg.outlet_events, asset_type=msg.asset_type, ) ``` ##### HTTP client in task sdk Introduced a new method called: `succeed` which will call the `ti_patch_state` api with `TISuccessStatePayload` ### Testing Using the DAG: https://github.com/apache/airflow/blob/main/airflow/example_dags/example_asset_alias.py DAGS:  #### Non aliases assets 1. Unpause: asset_s3_bucket_consumer and asset_s3_bucket_producer 2. Run the producer dag first:  Event: ``` {"json":"{\"state\":\"success\",\"end_date\":\"2025-01-22T10:01:24.675957Z\",\"task_outlets\":[{\"name\":\"s3://bucket/my-task\",\"uri\":\"s3://bucket/my-task\"}],\"outlet_events\":[{\"key\":{\"name\":\"s3://bucket/my-task\",\"uri\":\"s3://bucket/my-task\"},\"extra\":{},\"asset_alias_events\":[]}],\"asset_type\":\"Asset\",\"type\":\"SucceedTask\"}\n","timestamp":"2025-01-22T10:01:24.676014","logger":"task","event":"Sending request","level":"debug"} ``` Consumer DAG also gets triggered: <img width="1713" alt="image" src="https://github.com/user-attachments/assets/01a993f2-b08d-495c-b5ea-7b7f706cdc5c" /> #### Aliases assets 1. Unpause: asset_alias_example_alias_producer and asset_alias_example_alias_consumer 2. Trigger asset_alias_example_alias_producer dag <img width="1713" alt="image" src="https://github.com/user-attachments/assets/5d93cfa2-ee1c-4a99-8bac-99163fc9bd79" /> Event: ``` {"json":"{\"state\":\"success\",\"end_date\":\"2025-01-22T10:05:01.268374Z\",\"task_outlets\":[],\"outlet_events\":[{\"source_alias_name\":\"example-alias\",\"dest_asset_key\":{\"name\":\"s3://bucket/my-task\",\"uri\":\"s3://bucket/my-task\"},\"extra\":{}}],\"asset_type\":\"AssetAlias\",\"type\":\"SucceedTask\"}\n","timestamp":"2025-01-22T10:05:01.268428","logger":"task","event":"Sending request","level":"debug"} ``` This triggers two DAGs now: asset_alias_example_alias_consumer and asset_s3_bucket_consumer since the aliased asset was updated Fails due to `inlet_events` still not being ported: <img width="1713" alt="image" src="https://github.com/user-attachments/assets/ec7d09c6-0e5f-4006-b93f-15a265a4b479" /> <img width="1713" alt="image" src="https://github.com/user-attachments/assets/26833ef5-c564-4517-9757-7c8ec4f8aa5b" /> <img width="1713" alt="image" src="https://github.com/user-attachments/assets/26833ef5-c564-4517-9757-7c8ec4f8aa5b" /> #### DB level checks Asset created:  Asset Alias:  Asset Alias Asset mapping:  Asset event: (first one triggered by asset, second by alias)  Alias Event mapping: ![Uploading image.png…]() <!-- Please keep an empty line above the dashes. --> --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information. In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org