amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1926463681


##########
airflow/api_fastapi/execution_api/datamodels/asset.py:
##########
@@ -34,3 +34,18 @@ class AssetAliasResponse(BaseModel):
 
     name: str
     group: str
+
+
+class AssetProfile(BaseModel):
+    """
+    Profile of an Asset.
+
+    Asset will have name, uri and asset_type defined.
+    AssetNameRef will have name and asset_type defined.
+    AssetUriRef will have uri and asset_type defined.
+
+    """
+
+    name: str | None = None
+    uri: str | None = None
+    asset_type: str

Review Comment:
   I realised there was a bug in my earlier code where i was defining 
"asset_type" at the payload level. The issue with that is if you have more than 
1 outlet passed, it will always take the "asset_type" of the last outlet and 
not register events for the past ones.



##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -479,12 +481,39 @@ def run(ti: RuntimeTaskInstance, log: Logger):
 
         _push_xcom_if_needed(result, ti)
 
+        task_outlets = []
+        outlet_events = []
+        events = context["outlet_events"]
+
+        for obj in ti.task.outlets or []:
+            # Lineage can have other types of objects besides assets
+            asset_type = type(obj).__name__
+            if isinstance(obj, Asset):
+                task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, 
asset_type=asset_type))
+                outlet_events.append(attrs.asdict(events[obj]))  # type: ignore
+            elif isinstance(obj, AssetNameRef):
+                task_outlets.append(AssetProfile(name=obj.name, 
asset_type=asset_type))
+                # Send all events, filtering can be done in API server.
+                outlet_events.append(attrs.asdict(events))  # type: ignore
+            elif isinstance(obj, AssetUriRef):
+                task_outlets.append(AssetProfile(uri=obj.uri, 
asset_type=asset_type))
+                # Send all events, filtering can be done in API server.
+                outlet_events.append(attrs.asdict(events))  # type: ignore
+            elif isinstance(obj, AssetAlias):
+                task_outlets.append(AssetProfile(asset_type=asset_type))
+                for asset_alias_event in events[obj].asset_alias_events:
+                    outlet_events.append(attrs.asdict(asset_alias_event))

Review Comment:
   Each of these branches have asset types defined even if name and uri are 
optional.



##########
airflow/models/taskinstance.py:
##########
@@ -352,7 +353,26 @@ def _run_raw_task(
         if not test_mode:
             _add_log(event=ti.state, task_instance=ti, session=session)
             if ti.state == TaskInstanceState.SUCCESS:
-                ti._register_asset_changes(events=context["outlet_events"], 
session=session)
+                task_outlets = []
+                outlet_events = []
+                events = context["outlet_events"]
+                for obj in ti.task.outlets or []:
+                    # Lineage can have other types of objects besides assets
+                    asset_type = type(obj).__name__
+                    if isinstance(obj, Asset):
+                        task_outlets.append(AssetProfile(name=obj.name, 
uri=obj.uri, asset_type=asset_type))
+                        outlet_events.append(attrs.asdict(events[obj]))  # 
type: ignore
+                    elif isinstance(obj, AssetNameRef):
+                        task_outlets.append(AssetProfile(name=obj.name, 
asset_type=asset_type))
+                        outlet_events.append(attrs.asdict(events))  # type: 
ignore
+                    elif isinstance(obj, AssetUriRef):
+                        task_outlets.append(AssetProfile(uri=obj.uri, 
asset_type=asset_type))
+                        outlet_events.append(attrs.asdict(events))  # type: 
ignore
+                    elif isinstance(obj, AssetAlias):
+                        
task_outlets.append(AssetProfile(asset_type=asset_type))
+                        for asset_alias_event in 
events[obj].asset_alias_events:
+                            
outlet_events.append(attrs.asdict(asset_alias_event))
+                TaskInstance._register_asset_changes_int(ti, task_outlets, 
outlet_events, session=session)

Review Comment:
   This makes nuking 
https://github.com/apache/airflow/pull/45924/files#diff-62f7d8a52fefdb8e05d4f040c6d3459b4a56fe46976c24f68843dbaeb5a98487L2736-L2742
 possible. Not a big fan that we have to iterate here to calculate types and 
populate:
   ```
                   task_outlets = []
                   outlet_events = []
   ```
   
   And iterate again here: 
   
https://github.com/apache/airflow/pull/45924/files#diff-62f7d8a52fefdb8e05d4f040c6d3459b4a56fe46976c24f68843dbaeb5a98487R2781-R2795
   
   But should be ok for now
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to