amoghrajesh opened a new pull request, #45924:
URL: https://github.com/apache/airflow/pull/45924

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   
   closes: #45752
   
   
   ### Why?
   When a task completed (moves into success) state, asset related events for 
that particular task should be stores into the metadata DB. Currently it is 
done like this: 
[airflow/airflow/models/taskinstance.py](https://github.com/apache/airflow/blob/051e617e0d7d0ebb995cb98063709350f279963c/airflow/models/taskinstance.py#L359).
   
   This does a few things:
   1. It extracts outlets from the task
   2. For every outlet and its asset events, it creates record in the database 
in various tables:
   `asset`
   `asset_alias`
   `asset_alias_asset`
   `asset_alias_asset_event`
   `asset_event`
   3. The support is not limited to Asset type only, it also works with 
AssetAliases and refs for Assets like name ref and uri ref.
   
   This behaviour needs to be ported into the task sdk.
   
   ### Approach
   
   The idea is to implement this logic in the `patch` ti state endpoint in the 
execution API server. The reasoning is so that we needn't make an additional 
API call but when "finishing" a task from the task runner, we can send in the 
relevant details like the task outlets, the outlet events and take care of the 
rest in `ti_update_state` endpoint.
   
   #### Interface changes
   - We have a new payload: `TISuccessStatePayload` to mark a state as success 
from the task runner in the execution API -> reason being, we do not want to 
coagulate the `TITerminalStatePayload` with additional information slowing down 
the API request for no need.
   - The structure contains:
   `task_outlets`: translates to `ti.task.outlets` at execution time sent from 
the task runner
   `outlet_events`: these are the events for a `outlet` object. For example, 
for `Assets` it translates to `context["outlet_events"][Asset Object]`
   `asset_type`: we send the type of object (class name) the exeuction API has 
to deal with. This is to avoid any additional mental gymnastics on the server 
side to find the kind of object we are dealing with as it will be serialised.
   
   #### Server Side
   - In the `ti_update_state` added an additional branch for success state 
where we call `register_asset_changes` which is a similar function to 
https://github.com/apache/airflow/blob/051e617e0d7d0ebb995cb98063709350f279963c/airflow/models/taskinstance.py#L359
 but adjusted for the execution API.
   - This function does a few things: for every `task_outlet`, it registers the 
events for different types:
   1. For Asset, it receives events from the task runner, so it just registers 
those.
   2. For AssetNameRef and AssetUriRef, it find the relevant Asset for those 
and registers the events.
   3. For AssetAlias, it creates a map of number of events to be registered on 
the basis of unique pairs of `tuple[asset uri, extra]`, and generates events 
for those by handling some cases. Docs: 
https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html#how-to-use-datasetalias
   
   #### Execution side
   
   ##### Task Runner
   The task runner runs the task as usual and before finishing, checks for task 
outlets defined. If task outlets are defined, it populates the `task_outlets`, 
`outlet_events` and `asset_type` for the outlets present. 
   
   For `Asset`:
   1. Populates the `task_outlets` and `outlet_events` as `events[obj]` where 
`events = context["outlet_events"]`
   
   For `AssetNameRef` and `AssetUriRef`:
   1. Populates the `task_outlets` as needed and populates all the events 
possible as we cannot access the DB to get the model being referenced
   
   For `AssetAlias`:
   We dont care about the `task_outlets`, we only care about the 
`asset_alias_events`, so those are populated in `outlet_events`
   
   Once this is done, a `SucceedTask` is sent to supervisor.
   
   ##### Supervisor
   Supervisor starts treating the success state as `STATES_SENT_DIRECTLY` from 
now. Once it receives a `SuceedTask` message from the task runner, it calls
   ```
               self.client.task_instances.succeed(
                   id=self.id,
                   when=msg.end_date,
                   task_outlets=msg.task_outlets,
                   outlet_events=msg.outlet_events,
                   asset_type=msg.asset_type,
               )
   ```
   
   ##### HTTP client in task sdk
   Introduced a new method called: `succeed` which will call the 
`ti_patch_state` api with `TISuccessStatePayload`
   
   
   ### Testing
   Using the DAG: 
https://github.com/apache/airflow/blob/main/airflow/example_dags/example_asset_alias.py
   
   DAGS:
   
![image](https://github.com/user-attachments/assets/4824e373-6990-428b-bd14-305a5764636c)
   
   #### Non aliases assets
   
   1. Unpause: asset_s3_bucket_consumer and asset_s3_bucket_producer
   2. Run the producer dag first:
   
![image](https://github.com/user-attachments/assets/38333dba-3d8c-4470-a61f-39a881dc5549)
   
   Event: 
   ```
   
{"json":"{\"state\":\"success\",\"end_date\":\"2025-01-22T10:01:24.675957Z\",\"task_outlets\":[{\"name\":\"s3://bucket/my-task\",\"uri\":\"s3://bucket/my-task\"}],\"outlet_events\":[{\"key\":{\"name\":\"s3://bucket/my-task\",\"uri\":\"s3://bucket/my-task\"},\"extra\":{},\"asset_alias_events\":[]}],\"asset_type\":\"Asset\",\"type\":\"SucceedTask\"}\n","timestamp":"2025-01-22T10:01:24.676014","logger":"task","event":"Sending
 request","level":"debug"}
   ```
   
   Consumer DAG also gets triggered:
   <img width="1713" alt="image" 
src="https://github.com/user-attachments/assets/01a993f2-b08d-495c-b5ea-7b7f706cdc5c";
 />
   
   #### Aliases assets
   
   1. Unpause: asset_alias_example_alias_producer and 
asset_alias_example_alias_consumer
   2. Trigger asset_alias_example_alias_producer dag
   
   <img width="1713" alt="image" 
src="https://github.com/user-attachments/assets/5d93cfa2-ee1c-4a99-8bac-99163fc9bd79";
 />
   
   Event:
   ```
   
{"json":"{\"state\":\"success\",\"end_date\":\"2025-01-22T10:05:01.268374Z\",\"task_outlets\":[],\"outlet_events\":[{\"source_alias_name\":\"example-alias\",\"dest_asset_key\":{\"name\":\"s3://bucket/my-task\",\"uri\":\"s3://bucket/my-task\"},\"extra\":{}}],\"asset_type\":\"AssetAlias\",\"type\":\"SucceedTask\"}\n","timestamp":"2025-01-22T10:05:01.268428","logger":"task","event":"Sending
 request","level":"debug"}
   ```
   
   This triggers two DAGs now: asset_alias_example_alias_consumer and 
asset_s3_bucket_consumer since the aliased asset was updated
   
   Fails due to `inlet_events` still not being ported:
   <img width="1713" alt="image" 
src="https://github.com/user-attachments/assets/ec7d09c6-0e5f-4006-b93f-15a265a4b479";
 />
   
   
   <img width="1713" alt="image" 
src="https://github.com/user-attachments/assets/26833ef5-c564-4517-9757-7c8ec4f8aa5b";
 />
   
   
   <img width="1713" alt="image" 
src="https://github.com/user-attachments/assets/26833ef5-c564-4517-9757-7c8ec4f8aa5b";
 />
   
   
   #### DB level checks
   Asset created:
   
![image](https://github.com/user-attachments/assets/71865be5-80b3-4d6f-9cee-40dad7ec764f)
   
   Asset Alias:
   
![image](https://github.com/user-attachments/assets/eb447b94-37cf-4279-8c73-d4b34e8b8f5c)
   
   Asset Alias Asset mapping:
   
![image](https://github.com/user-attachments/assets/9e2d5b58-fa1b-43ed-8f4a-37257d24f323)
   
   Asset event: (first one triggered by asset, second by alias)
   
![image](https://github.com/user-attachments/assets/1adcd328-336e-4219-8728-35196f418016)
   
   Alias Event mapping:
   ![Uploading image.png…]()
   
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to