amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1926828616


##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -479,12 +481,39 @@ def run(ti: RuntimeTaskInstance, log: Logger):
 
         _push_xcom_if_needed(result, ti)
 
+        task_outlets = []
+        outlet_events = []
+        events = context["outlet_events"]
+
+        for obj in ti.task.outlets or []:
+            # Lineage can have other types of objects besides assets
+            asset_type = type(obj).__name__
+            if isinstance(obj, Asset):
+                task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, 
asset_type=asset_type))
+                outlet_events.append(attrs.asdict(events[obj]))  # type: ignore
+            elif isinstance(obj, AssetNameRef):
+                task_outlets.append(AssetProfile(name=obj.name, 
asset_type=asset_type))
+                # Send all events, filtering can be done in API server.
+                outlet_events.append(attrs.asdict(events))  # type: ignore
+            elif isinstance(obj, AssetUriRef):
+                task_outlets.append(AssetProfile(uri=obj.uri, 
asset_type=asset_type))
+                # Send all events, filtering can be done in API server.
+                outlet_events.append(attrs.asdict(events))  # type: ignore
+            elif isinstance(obj, AssetAlias):
+                task_outlets.append(AssetProfile(asset_type=asset_type))
+                for asset_alias_event in events[obj].asset_alias_events:
+                    outlet_events.append(attrs.asdict(asset_alias_event))

Review Comment:
   Yeah let me do that. I have similar logic in task runner too. 
   
   Wondering if I can move it to lets say `definitions/assets/utils.py`? 
Definitions doesn't look to be a good place to me honestly, but I cant think of 
some other location. Any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to