amoghrajesh commented on code in PR #67839:
URL: https://github.com/apache/airflow/pull/67839#discussion_r3338985312


##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########


Review Comment:
   The `_handle_request` should be handling this in here?



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1266,6 +1269,16 @@ async def create_triggers(self):
             trigger_instance.triggerer_job_id = self.job_id
             trigger_instance.timeout_after = workload.timeout_after
 
+            if isinstance(trigger_instance, BaseEventTrigger) and 
workload.watched_assets:
+                # Reconstruct AssetStateAccessors from watched_assets
+                from airflow.sdk.definitions.asset import Asset
+                from airflow.sdk.execution_time.context import 
AssetStateAccessors

Review Comment:
   Already a top level import



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -741,11 +737,18 @@ def _create_workload(
         render_log_fname: Callable[..., str],
         session: Session,
     ) -> workloads.RunTrigger | None:
+        # Pass the "watched" Assets through for downstream use in 
BaseEventTrigger
         if trigger.task_instance is None:
+            watched_assets: dict[str, str] | None = None
+
+            if trigger.asset_watchers:
+                watched_assets = {a.name: a.uri for a in trigger.assets}

Review Comment:
   Either use trigger.assets or trigger.asset_watchers for if check and 
iteration



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to