sjyangkevin commented on code in PR #54568:
URL: https://github.com/apache/airflow/pull/54568#discussion_r2295139378


##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -2538,6 +2564,118 @@ def execute(self, context):
         assert listener.state == [TaskInstanceState.RUNNING, 
TaskInstanceState.FAILED]
         assert listener.error == error
 
+    def test_listener_access_outlet_event_on_running_and_success(self, 
mocked_parse, mock_supervisor_comms):
+        """Test listener can access outlet events through invoking 
get_template_context() while task running and success"""
+        listener = self.CustomOutletEventsListener()
+        get_listener_manager().add_listener(listener)
+
+        test_asset = Asset("test-asset")
+        test_key = AssetUniqueKey(name="test-asset", uri="test-asset")
+        test_extra = {"name1": "value1", "nested_obj": {"name2": "value2"}}
+
+        class Producer(BaseOperator):
+            def execute(self, context):
+                outlet_events = context["outlet_events"]
+                outlet_events[test_asset].extra = {"name1": "value1", 
"nested_obj": {"name2": "value2"}}
+
+        task = Producer(
+            
task_id="test_listener_access_outlet_event_on_running_and_success", 
outlets=[test_asset]
+        )
+        dag = get_inline_dag(dag_id="test_dag", task=task)
+        ti = TaskInstance(
+            id=uuid7(),
+            task_id=task.task_id,
+            dag_id=dag.dag_id,
+            run_id="test_run",
+            try_number=1,
+            dag_version_id=uuid7(),
+        )
+
+        runtime_ti = RuntimeTaskInstance.model_construct(
+            **ti.model_dump(exclude_unset=True), task=task, 
start_date=timezone.utcnow()
+        )
+
+        log = mock.MagicMock()
+        context = runtime_ti.get_template_context()
+
+        with mock.patch(
+            
"airflow.sdk.execution_time.task_runner._validate_task_inlets_and_outlets"

Review Comment:
   Actually, I wasn't quite sure why we need this patch here. Without this 
patch, the listener will not be triggered when `outlets` is set. Would 
appreciate if anyone could provide some context about it. Probably because the 
`test_asset` is somehow considered as an invalid asset, like not written into 
the DB?
   
   ```python
   task = Producer(
       task_id="test_listener_access_outlet_event_on_running_and_success", 
outlets=[test_asset]
   )
   ```



##########
scripts/ci/prek/check_template_context_variable_in_sync.py:
##########


Review Comment:
   the update to cache the context is not compatible with this pre-commit (or 
prek) check. I wasn't sure whether it is fine to update this logic. Would 
appreciate any feedback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to