ashb commented on code in PR #46677:
URL: https://github.com/apache/airflow/pull/46677#discussion_r1958030934
##########
airflow/models/trigger.py:
##########
@@ -360,3 +364,82 @@ def get_sorted_triggers(cls, capacity: int,
alive_triggerer_ids: list[int] | Sel
# Add triggers associated to assets after triggers associated to tasks
# It prioritizes DAGs over event driven scheduling which is fair
return ti_triggers + asset_triggers
+
+
+@singledispatch
+def handle_event_submit(event: events.TriggerEvent, *, task_instance:
TaskInstance, session: Session) -> None:
+ """
+ Handle the submit event for a given task instance.
+
+ This function sets the next method and next kwargs of the task instance,
+ as well as its state to scheduled. It also adds the event's payload
+ into the kwargs for the task.
+
+ :param task_instance: The task instance to handle the submit event for.
+ :param session: The session to be used for the database callback sink.
+ """
+ from airflow.utils.state import TaskInstanceState
+
+ # Get the next kwargs of the task instance, or an empty dictionary if it
doesn't exist
+ next_kwargs = task_instance.next_kwargs or {}
+
+ # Add the event's payload into the kwargs for the task
+ next_kwargs["event"] = event.payload
+
+ # Update the next kwargs of the task instance
+ task_instance.next_kwargs = next_kwargs
+
+ # Remove ourselves as its trigger
+ task_instance.trigger_id = None
+
+ # Set the state of the task instance to scheduled
+ task_instance.state = TaskInstanceState.SCHEDULED
+ task_instance.scheduled_dttm = timezone.utcnow()
+ session.flush()
Review Comment:
It's already atteched to the session, so it's already tracking changes made
to the object.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]