jason810496 opened a new issue, #66934:
URL: https://github.com/apache/airflow/issues/66934

   ### Background
   
   In `airflow-core/src/airflow/dag_processing/processor.py` (around lines 
582–590), the Dag Processor's `_on_child_started` has the coordinator-pinning 
block commented out behind a TODO:
   
   ```python
   # TODO(jason810496): Uncomment the coordinator-pinning block below once the
   # Dag Processor <-> Coordinator interaction pattern is settled and
   # ``CoordinatorManager.for_dag_file`` is reintroduced (it was removed in
   # astronomer/airflow#1578, which narrowed AIP-108 scope to task execution 
only).
   # When a coordinator handles this file, the pin lets the supervisor's IPC
   # migrator downgrade outgoing head-shape bodies and upgrade incoming
   # bodies against the lang-SDK's frozen message schema version.
   # if (coordinator := get_coordinator_manager().for_dag_file(bundle_name, 
path)) is not None:
   #     self.lang_sdk_msg_schema_version = 
coordinator.target_msg_schema_version(msg)
   self.send_msg(msg, request_id=0)
   ```
   
   The block was disabled because `CoordinatorManager.for_dag_file` was removed 
in astronomer/airflow#1578, which narrowed AIP-108's initial scope to task 
execution only. As a result, the Dag Processor currently cannot pin its 
outgoing IPC frames to a foreign-runtime (lang-SDK) message schema version, so 
the supervisor's migrator cannot downgrade outgoing head-shape bodies / upgrade 
incoming bodies for Dag-parsing exchanges with non-Python lang-SDK runtimes.
   
   This issue tracks the follow-up work so the deferred TODO has a stable link, 
per the "Tracking issues for deferred work" rule in `AGENTS.md`.
   
   ### What needs to happen
   
   1. Settle the Dag Processor ↔ Coordinator interaction pattern for the 
parsing path (which file extensions / queues route to which coordinator at 
parse time, and what `StartupDetails`-equivalent message the parser sends).
   2. Reintroduce a resolution entry point on `CoordinatorManager` (e.g. 
`for_dag_file(bundle_name, path)`) or its successor that returns the 
coordinator handling a given Dag file.
   3. Uncomment / re-wire the pinning block in `_on_child_started`, setting 
`self.lang_sdk_msg_schema_version` from 
`coordinator.target_msg_schema_version(msg)` before `send_msg`.
   4. Add unit tests covering the three cases the existing 
`TestDagFileParseRequestCoordinatorMigration` parametrization assumes (no 
coordinator → no pin; coordinator with `target_msg_schema_version` returning a 
known version → pin set; unknown / invalid manifest version → graceful failure 
path).
   5. Remove or rewrite the existing tests in `test_processor.py` that 
currently assert on `proc.lang_sdk_msg_schema_version` so they reflect the 
final behavior.
   
   ### Acceptance criteria
   
   - The TODO at `airflow-core/src/airflow/dag_processing/processor.py:582-590` 
is removed.
   - A Dag file routed to a non-Python coordinator results in 
`lang_sdk_msg_schema_version` being set on the Dag-processor subprocess before 
`StartupDetails` (or its parsing equivalent) is sent.
   - `CoordinatorManager` exposes a public method for Dag-file → coordinator 
resolution, used by both the parser and (where appropriate) the supervisor.
   - Tests in `airflow-core/tests/.../test_processor.py` cover the pinning path 
end-to-end.
   
   ### Context
   
   - Original PR where the TODO was introduced: astronomer/airflow#1577 
(mirrored upstream in the AIP-108 work)
   - Removal of `CoordinatorManager.for_dag_file`: astronomer/airflow#1578
   - Related review thread that requested this tracking issue: 
https://github.com/astronomer/airflow/pull/1577#discussion_r3240218006


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to