ramitkataria commented on code in PR #66608:
URL: https://github.com/apache/airflow/pull/66608#discussion_r3245400157
##########
airflow-core/src/airflow/triggers/callback.py:
##########
@@ -48,13 +48,63 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
{attr: getattr(self, attr) for attr in ("callback_path",
"callback_kwargs")},
)
+ async def _build_context(
+ self, dag_id: str, run_id: str, deadline_id: str | None,
deadline_time: str | None
+ ) -> dict[str, Any]:
+ """
+ Fetch the DagRun via the Execution API and build a context dict for
the callback.
+
+ This replaces the previous approach of storing a serialized context in
the database
+ at scheduling time. Fetching at execution time ensures the context is
fresh and avoids
+ DB bloat from large serialized payloads.
+ """
+ from airflow.sdk.execution_time.comms import DagRunResult, GetDagRun
+ from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
+
+ response = await SUPERVISOR_COMMS.asend(GetDagRun(dag_id=dag_id,
run_id=run_id))
Review Comment:
I don't think a trigger is supposed to directly interact with
SUPERVISOR_COMMS. That's the job of the trigger runner
##########
airflow-core/src/airflow/triggers/callback.py:
##########
@@ -48,13 +48,63 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
{attr: getattr(self, attr) for attr in ("callback_path",
"callback_kwargs")},
)
+ async def _build_context(
Review Comment:
Ideally, we should be minimizing any callback specific code for context and
use the same type for context as the one used for tasks. So I think we should
remove this function entirely
##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -215,30 +215,21 @@ def prune_deadlines(cls, *, session: Session, conditions:
dict[Mapped, Any]) ->
def handle_miss(self, session: Session):
"""Handle a missed deadline by queueing the callback."""
-
- def get_simple_context():
- from airflow.api_fastapi.core_api.datamodels.dag_run import
DAGRunResponse
- from airflow.models import DagRun
-
- # TODO: Use the TaskAPI from within Triggerer to fetch full
context instead of sending this context
- # from the scheduler
-
- # Fetch the DagRun from the database again to avoid errors when
self.dagrun's relationship fields
- # are not in the current session.
- dagrun = session.get(DagRun, self.dagrun_id)
-
- return {
- "dag_run":
DAGRunResponse.model_validate(dagrun).model_dump(mode="json"),
- "deadline": {"id": self.id, "deadline_time":
self.deadline_time},
- }
+ # Store only identifiers in kwargs; the callback executor (triggerer
or executor subprocess)
+ # fetches the full DagRun context via the Execution API at runtime.
This avoids DB bloat
+ # from serialized context and ensures context is fresh at execution
time.
+ context_identifiers = {
Review Comment:
Let's remove all these identifiers and have the triggerrer supervisor fetch
these like it does for tasks. I would like to keep
`self.callback.data["kwargs"]` as minimal as possible besides the user
specified kwargs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]