Hi Jarek,

Thanks for the review - good to have some more eyes on this.

One important thing that was not explicitly stated in the AIP (only by 
“side-effect” in the diagram, or the airflow.sdk code references) is that the 
Coordinator is entirely a worker-side construct. The scheduler is entirely 
unaware of how a task runs, just that it does. 

Point 2: queue_to_sdk is per-worker deployment config. The scheduler never sees 
or touches coordinator selection -- it only knows about queue in the same way 
it always has. The CeleryKubernetesExecutor comparison doesn't quite apply here 
because that was a scheduler/executor-level conflation; here the scheduler is 
entirely unaware coordinators exist. Adding coordinator="jdk-17" to @task.stub 
would require the scheduler to carry and propagate that to the worker, which is 
exactly the coupling we're trying to avoid. The coordinator choice is a 
deployment operator decision (not 100% only, but mostly), not a DAG authoring 
decision.

This is especially true for one of the use cases I had in my presentation at 
Airflow Sumit where you have a separate team define the task, and you just 
“call it like a function”. You as the dag author don’t need, and shouldn’t know 
if it’s run in idk-8 or jdk-25.

Point 3: Since it's per-worker config there's no shared registry for 
coordinators to compete in. jdk-11 vs jdk-17 is just two differently-configured 
workers with different queue mappings. First-match within a single worker's 
config is deterministic and unambiguous.

Point 6: Falls out naturally -- different teams' workers have different 
configs. No scheduler-level shared config concern.

-ash


> On 11 May 2026, at 00:29, Tzu-ping Chung via dev <[email protected]> 
> wrote:
> 
> The AIP has been updated to emphasise how to coordinator works with the 
> (non-Python) SDK artefact, and that process management (if present) and 
> message communication is not specified in the proposal since it is a contract 
> only between the two.
> 
> I also removed all references to parsing a Java dag file to avoid any 
> potential conflicts to AIP-82. AIP-108 now only defines how the coordinator 
> should find an executable target for a stub task, and the process it goes 
> through to fulfil execution.
> 
> Moving to other topics mentioned previously…
> 
>> 
>> 1. The important topic already discussed is the wire protocol. While I agree 
>> it's not a "blocker" for this AIP, it's 100% a blocker for releasing the 
>> coordinator. Until we ship the first version of the coordinator we can 
>> **easily** change the wire protocol. Once we ship it, it's basically a done 
>> deal and it will be difficult to take a different path. But I do not see it 
>> as a blocker for this AIP. I saw early versions (not public yet—I saw 
>> messages from Confluence and read some early drafts) showing that the 
>> current choice was a well-thought-out decision. I have my own sentiments and 
>> experience with protobufs—they're pretty bad, and I really wouldn't like to 
>> use them here—but I agree this is a different discussion and shouldn't 
>> prevent acceptance of this AIP. This is contingent on us agreeing that we 
>> will not ship the coordinator until AIP-109 (which I saw was the proposal) 
>> is discussed and agreed upon. That discussion should happen **after** we 
>> complete this one and vote on this AIP; otherwise, it might get stuck in 
>> endless bikeshedding. I'm glad the proposal exists, and I even more glad 
>> it's now gated by permissions (I saw that TP, Ash, and Jason did that). Once 
>> we accept this one, we will open AIP-109 and discuss and agree on it well 
>> before we actually "ship" the first coordinator.
> 
> I disagree this is a blocker from releasing the coordinator. Since a 
> language-specific coordinator is its own distribution, it does not follow 
> Airflow Core and SDK’s major release cycles. It can be made experimental, 
> and/or bump the major version for breaking changes on its own.
> 
> However, in the meantime, some changes in Airflow Core and SDK are needed for 
> them to know the coordinator exists and route workload to it. The best 
> approach IMO is to release the base coordinator interface in Airflow 3.2 with 
> a provisional Java SDK and coordinator implementation (maybe using a 
> zero-prefixed version). This would be most helpful for us to understand 
> potential issues, rather than trying to make decisions in a vacuum.
> 
> 
>> 3. First match coordinator choice: Choosing the first matching coordinator 
>> is I think, a bad choice (paired with the queue_to_coordinator mapping). It 
>> is simplistic and has potential problems, such as when "similar" 
>> coordinators effectively "compete" for Dags/Tasks. Even the very example of 
>> two "jdk" coordinators shows the problem. I can very easily see say "jdk-11" 
>> and "jdk-17" coordinators configured in the same setup. The sequence of 
>> setting those coordinators will determine whch one is used, which shows a 
>> bit of asymmetry. If you want to use "jdk-17", you must specify it, rather 
>> than having it matched automatically. However, I can easily imagine a 
>> slightly more complicated interface where the matcher would return a tuple - 
>> ("matches": bool, "priority": int) + some deterministic tie-break or 
>> conflict error if we have two identical matches. This could be a much more 
>> flexible solution. It would handle a much more versatile set of cases. For 
>> example, you could have "jdk-11/" and "jdk-17" folders in a bundle and have 
>> two coordinators pick which one handles the dags based on the folder name - 
>> without needing to specify the coordinator by "queue" or "name. In my 
>> opinion, that is a much better solution than the one the 
>> "queue_to_coordinator" tried to solve by proposing an indirection layer in 
>> the Dag definition. Performance-wise, the impact should be negligible. I 
>> hardly imagine we'd have to worry about more than a few coordinators and the 
>> complexity of the "can_handle_dag" implementation, which would involve 
>> always running "can_handle_dag" for all coordinators rather than only when 
>> the last coordinator matches.
> 
> I’m not sure where the “first matching” understanding is from. (Please point 
> out relevant text on this; it is most likely out-of-date.) A task specifies a 
> queue, and then the queue is mapped to a specific coordinator. There can’t be 
> multiple to choose from. You can map multiple queues to use the same 
> coordinator, but only one coordinator can be assigned to a queue.
> 
> 
>> 4. JVM process model -> unlike in Python, we do not know if we are running 
>> new processes or running tasks within the JVM process. This has implications 
>> for memory, startup time overhead, JIT, isolation (especially for multi-team 
>> environments), and restart-on-failure. I think we should spell out the 
>> proposal clearly. Possibly (if my queue/coordinator split is accepted), we 
>> could use the queue to determine the task's execution mode.
> 
> I think this are implementation aspects. Managing a JVM process is no 
> different from Python, and all the characteristics that a Python task possess 
> apply directly to a Java one. There are really no differences that need 
> specifications here. (Maybe this point was based on the understanding of the 
> next point where Java tasks can reuse the same process? But we’re not doing 
> that.)
> 
> 
>> 6. Multi-team: I guess coordinator definition should not be shared between 
>> teams (unless they want it). The current proposal puts them all in single 
>> ,cfg but Multi-team has this format: Which I think the coordinators should 
>> also follow?
>> 
>> [core]
>> executor = GlobalExecutor;team1=Team1Executor;team2=Team2Executor
> 
> The executor config is formatted this way since it is a default value, and 
> each team can have a different default. The coordinators config specifies 
> what should exist in Airflow instead, and each team should choose which 
> coordinator they want to use (by specifying the queues their tasks use). It 
> is not necessary (and honestly IMO extremely confusing in practice) if each 
> team can have a different set of coordinators.
> 
> 
>> 7. How will logging / tracing be handled? -  Currently there is no mention 
>> of it in the AIP, I am sure it has already been considered and resolved 
>> (including wire protocol and Java logger integration). But it's not 
>> specified in the design - and I think describing it is quite important. 
>> Since we are using StructLog, we want to use a similar tool in Java and 
>> integrate it with existing struct-logging - but it's not clear how. Java has 
>> its own logging system and plenty of logging "wrappers" - but here I think 
>> we have similar problems with "wire protocol." Which one are we going to 
>> use? Where do we convert what Java produces into StructLog's JSON? On which 
>> side does this conversion happen? Will the wire protocol be the same for all 
>> languages? Similar to the wire protocol above, details and choices might be 
>> discussed in a separate AIP. However, at least the "Java choices" regarding 
>> logging and how we envision communication for logs should be documented. 
>> Similarly with OpenTelemetry (OTEL) tracing: perhaps we should skip it for 
>> now and assume no tracing exists. More likely, however, we will integrate 
>> the Java side with the Java OTEL implementation and pass enough information 
>> to the task so it can provide sufficient tracing information via its own 
>> OTEL configuration. ? I think we should spell out the choices we made at the 
>> very least here.
> 
> The task (user code) simply print to stdout and stderr, and it gets hurled to 
> the parent process and recorded with standard Airflow logging. How the 
> messages are actually print is up to the user. Or you can do your own 
> logging—we don’t care. (This is actually the same in Python; Airflow itself 
> uses structured logging, but user-generated task logs are just strings; 
> Airflow does not enforce a format there.)
> 
> The Java SDK actually has a separate channel to send internal logs so they 
> are not mixed with user logs, but again this is specific to the 
> coordinator-SDK combination and doesn’t need to be specified. Each 
> coordinator can do this in its own way best fit from the SDK.
> 
> Tracing is currently not implemented, but IMO this is more of a standalone 
> feature on the Java SDK,  The Java SDK can simply be configured to send to 
> the same OT endpoint, and we can probably build some utility functions to 
> simply things for task authors, but ultimately there’s not really anything to 
> specify in the AIP since Airflow doesn’t read tracing information (it only 
> emits).
> 
> TP
> 
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to