amoghrajesh opened a new pull request, #45924:
URL: https://github.com/apache/airflow/pull/45924
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of an existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
closes: #45752
### Why?
When a task completed (moves into success) state, asset related events for
that particular task should be stores into the metadata DB. Currently it is
done like this:
[airflow/airflow/models/taskinstance.py](https://github.com/apache/airflow/blob/051e617e0d7d0ebb995cb98063709350f279963c/airflow/models/taskinstance.py#L359).
This does a few things:
1. It extracts outlets from the task
2. For every outlet and its asset events, it creates record in the database
in various tables:
`asset`
`asset_alias`
`asset_alias_asset`
`asset_alias_asset_event`
`asset_event`
3. The support is not limited to Asset type only, it also works with
AssetAliases and refs for Assets like name ref and uri ref.
This behaviour needs to be ported into the task sdk.
### Approach
The idea is to implement this logic in the `patch` ti state endpoint in the
execution API server. The reasoning is so that we needn't make an additional
API call but when "finishing" a task from the task runner, we can send in the
relevant details like the task outlets, the outlet events and take care of the
rest in `ti_update_state` endpoint.
#### Interface changes
- We have a new payload: `TISuccessStatePayload` to mark a state as success
from the task runner in the execution API -> reason being, we do not want to
coagulate the `TITerminalStatePayload` with additional information slowing down
the API request for no need.
- The structure contains:
`task_outlets`: translates to `ti.task.outlets` at execution time sent from
the task runner
`outlet_events`: these are the events for a `outlet` object. For example,
for `Assets` it translates to `context["outlet_events"][Asset Object]`
`asset_type`: we send the type of object (class name) the exeuction API has
to deal with. This is to avoid any additional mental gymnastics on the server
side to find the kind of object we are dealing with as it will be serialised.
#### Server Side
- In the `ti_update_state` added an additional branch for success state
where we call `register_asset_changes` which is a similar function to
https://github.com/apache/airflow/blob/051e617e0d7d0ebb995cb98063709350f279963c/airflow/models/taskinstance.py#L359
but adjusted for the execution API.
- This function does a few things: for every `task_outlet`, it registers the
events for different types:
1. For Asset, it receives events from the task runner, so it just registers
those.
2. For AssetNameRef and AssetUriRef, it find the relevant Asset for those
and registers the events.
3. For AssetAlias, it creates a map of number of events to be registered on
the basis of unique pairs of `tuple[asset uri, extra]`, and generates events
for those by handling some cases. Docs:
https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html#how-to-use-datasetalias
#### Execution side
##### Task Runner
The task runner runs the task as usual and before finishing, checks for task
outlets defined. If task outlets are defined, it populates the `task_outlets`,
`outlet_events` and `asset_type` for the outlets present.
For `Asset`:
1. Populates the `task_outlets` and `outlet_events` as `events[obj]` where
`events = context["outlet_events"]`
For `AssetNameRef` and `AssetUriRef`:
1. Populates the `task_outlets` as needed and populates all the events
possible as we cannot access the DB to get the model being referenced
For `AssetAlias`:
We dont care about the `task_outlets`, we only care about the
`asset_alias_events`, so those are populated in `outlet_events`
Once this is done, a `SucceedTask` is sent to supervisor.
##### Supervisor
Supervisor starts treating the success state as `STATES_SENT_DIRECTLY` from
now. Once it receives a `SuceedTask` message from the task runner, it calls
```
self.client.task_instances.succeed(
id=self.id,
when=msg.end_date,
task_outlets=msg.task_outlets,
outlet_events=msg.outlet_events,
asset_type=msg.asset_type,
)
```
##### HTTP client in task sdk
Introduced a new method called: `succeed` which will call the
`ti_patch_state` api with `TISuccessStatePayload`
### Testing
Using the DAG:
https://github.com/apache/airflow/blob/main/airflow/example_dags/example_asset_alias.py
DAGS:

#### Non aliases assets
1. Unpause: asset_s3_bucket_consumer and asset_s3_bucket_producer
2. Run the producer dag first:

Event:
```
{"json":"{\"state\":\"success\",\"end_date\":\"2025-01-22T10:01:24.675957Z\",\"task_outlets\":[{\"name\":\"s3://bucket/my-task\",\"uri\":\"s3://bucket/my-task\"}],\"outlet_events\":[{\"key\":{\"name\":\"s3://bucket/my-task\",\"uri\":\"s3://bucket/my-task\"},\"extra\":{},\"asset_alias_events\":[]}],\"asset_type\":\"Asset\",\"type\":\"SucceedTask\"}\n","timestamp":"2025-01-22T10:01:24.676014","logger":"task","event":"Sending
request","level":"debug"}
```
Consumer DAG also gets triggered:
<img width="1713" alt="image"
src="https://github.com/user-attachments/assets/01a993f2-b08d-495c-b5ea-7b7f706cdc5c"
/>
#### Aliases assets
1. Unpause: asset_alias_example_alias_producer and
asset_alias_example_alias_consumer
2. Trigger asset_alias_example_alias_producer dag
<img width="1713" alt="image"
src="https://github.com/user-attachments/assets/5d93cfa2-ee1c-4a99-8bac-99163fc9bd79"
/>
Event:
```
{"json":"{\"state\":\"success\",\"end_date\":\"2025-01-22T10:05:01.268374Z\",\"task_outlets\":[],\"outlet_events\":[{\"source_alias_name\":\"example-alias\",\"dest_asset_key\":{\"name\":\"s3://bucket/my-task\",\"uri\":\"s3://bucket/my-task\"},\"extra\":{}}],\"asset_type\":\"AssetAlias\",\"type\":\"SucceedTask\"}\n","timestamp":"2025-01-22T10:05:01.268428","logger":"task","event":"Sending
request","level":"debug"}
```
This triggers two DAGs now: asset_alias_example_alias_consumer and
asset_s3_bucket_consumer since the aliased asset was updated
Fails due to `inlet_events` still not being ported:
<img width="1713" alt="image"
src="https://github.com/user-attachments/assets/ec7d09c6-0e5f-4006-b93f-15a265a4b479"
/>
<img width="1713" alt="image"
src="https://github.com/user-attachments/assets/26833ef5-c564-4517-9757-7c8ec4f8aa5b"
/>
<img width="1713" alt="image"
src="https://github.com/user-attachments/assets/26833ef5-c564-4517-9757-7c8ec4f8aa5b"
/>
#### DB level checks
Asset created:

Asset Alias:

Asset Alias Asset mapping:

Asset event: (first one triggered by asset, second by alias)

Alias Event mapping:
![Uploading image.png…]()
<!-- Please keep an empty line above the dashes. -->
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information.
In case of fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a
newsfragment file, named `{pr_number}.significant.rst` or
`{issue_number}.significant.rst`, in
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]