sjyangkevin commented on code in PR #54568:
URL: https://github.com/apache/airflow/pull/54568#discussion_r2295139378
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -2538,6 +2564,118 @@ def execute(self, context):
assert listener.state == [TaskInstanceState.RUNNING,
TaskInstanceState.FAILED]
assert listener.error == error
+ def test_listener_access_outlet_event_on_running_and_success(self,
mocked_parse, mock_supervisor_comms):
+ """Test listener can access outlet events through invoking
get_template_context() while task running and success"""
+ listener = self.CustomOutletEventsListener()
+ get_listener_manager().add_listener(listener)
+
+ test_asset = Asset("test-asset")
+ test_key = AssetUniqueKey(name="test-asset", uri="test-asset")
+ test_extra = {"name1": "value1", "nested_obj": {"name2": "value2"}}
+
+ class Producer(BaseOperator):
+ def execute(self, context):
+ outlet_events = context["outlet_events"]
+ outlet_events[test_asset].extra = {"name1": "value1",
"nested_obj": {"name2": "value2"}}
+
+ task = Producer(
+
task_id="test_listener_access_outlet_event_on_running_and_success",
outlets=[test_asset]
+ )
+ dag = get_inline_dag(dag_id="test_dag", task=task)
+ ti = TaskInstance(
+ id=uuid7(),
+ task_id=task.task_id,
+ dag_id=dag.dag_id,
+ run_id="test_run",
+ try_number=1,
+ dag_version_id=uuid7(),
+ )
+
+ runtime_ti = RuntimeTaskInstance.model_construct(
+ **ti.model_dump(exclude_unset=True), task=task,
start_date=timezone.utcnow()
+ )
+
+ log = mock.MagicMock()
+ context = runtime_ti.get_template_context()
+
+ with mock.patch(
+
"airflow.sdk.execution_time.task_runner._validate_task_inlets_and_outlets"
Review Comment:
Actually, I wasn't quite sure why we need this patch here. Without this
patch, the listener will not be triggered when `outlets` is set. Would
appreciate if anyone could provide some context about it. Probably because the
`test_asset` is somehow considered as an invalid asset, like not written into
the DB?
```python
task = Producer(
task_id="test_listener_access_outlet_event_on_running_and_success",
outlets=[test_asset]
)
```
##########
scripts/ci/prek/check_template_context_variable_in_sync.py:
##########
Review Comment:
the update to cache the context is not compatible with this pre-commit (or
prek) check. I wasn't sure whether it is fine to update this logic. Would
appreciate any feedback.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]