ferruzzi commented on code in PR #66608:
URL: https://github.com/apache/airflow/pull/66608#discussion_r3229667447


##########
airflow-core/src/airflow/triggers/callback.py:
##########
@@ -48,13 +48,64 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
             {attr: getattr(self, attr) for attr in ("callback_path", 
"callback_kwargs")},
         )
 
+    async def _build_context(
+        self, dag_id: str, run_id: str, deadline_id: str | None, 
deadline_time: str | None
+    ) -> dict[str, Any]:
+        """
+        Fetch the DagRun via the Execution API and build a context dict for 
the callback.
+
+        This replaces the previous approach of storing a serialized context in 
the database
+        at scheduling time. Fetching at execution time ensures the context is 
fresh and avoids
+        DB bloat from large serialized payloads.
+        """
+        from airflow.sdk.execution_time.comms import DagRunResult, GetDagRun
+        from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
+
+        response = await SUPERVISOR_COMMS.asend(GetDagRun(dag_id=dag_id, 
run_id=run_id))
+        if not isinstance(response, DagRunResult):
+            log.warning("Unexpected response type from GetDagRun: %s", 
type(response))
+            return {}
+
+        context: dict[str, Any] = {
+            "dag_run": response.model_dump(mode="json"),
+            "dag_id": dag_id,
+            "run_id": run_id,
+            "logical_date": response.logical_date.isoformat() if 
response.logical_date else None,
+            "data_interval_start": (
+                response.data_interval_start.isoformat() if 
response.data_interval_start else None
+            ),
+            "data_interval_end": (
+                response.data_interval_end.isoformat() if 
response.data_interval_end else None
+            ),
+            "conf": response.conf,
+        }
+
+        if deadline_id or deadline_time:
+            context["deadline"] = {
+                "id": deadline_id,
+                "deadline_time": deadline_time,
+            }

Review Comment:
   Non-blocking but this smells a little funny.  We're checking OR then using 
both anyway.  Tyhe user will get a None value if one is missing.   Is that 
intentional, or should we be using prune_dict() to drop the "empty" keys?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to