Hello,

*Ash*,

> One important thing not explicitly stated in the AIP (only implied by a
"side-effect” in the diagram or airflow.sdk code references) is that the
Coordinator is entirely a worker-side construct. The scheduler is entirely
unaware of how a task runs, only that it does.

That change in scope really changes things a LOT.

Because when I looked at it a few days ago, it was not. Back then (and at
the dev call) it looked like a generic solution that should eventually
handle selection of different languages and introduce a coordinator
standard and interface that will be implemented across many different
languages. It also involved part of DagFile Processing, making it a much
more "shared" construct between Worker and DagFileProcess. It also appeared
to be configured the same way in Scheduler, and Scheduler makes the choice
of which coordinator to select for which task.

After removing the Dag File Processor, it can now truly be thought of only
as a "worker" (i.e., a task execution process). And that indeed changes and
simplifies a lot and I think many more decisions can now be better
justified.

But with this scope, to be honest that starts to feel much less like a
coordinator and more like a "Bridge to run Java Tasks." It's very specific
to Java. Is this something we want to use in exactly this form - or even
reuse anything of in different language implementations?
Removing this "first matching" mechanism from the AIP changes its
"genericness" and applicability for other languages quite dramatically.
Basically, all that remains is something like a* "Python <> Java bridge to
start the process and pass through the task-SDK."* And I am not even sure
if the "coordinator" concept fits it any more - I would rather call it "
*bridge*" or something like that to make it clearer :). And in my opinion,
it should be an optional feature of "task-sdk," not a separate
distribution. I am not sure if separate "coordinator" distributions are
justified in this case.

Paraphrasing it: If I understand correctly, the current scope is to allow
the Python task process to run a subprocess specifically in a
JDK-interface-specific way, send it all the starting data, pass-through
Task-SDK messages and send back responses - without any ambition to become
the standard approach for any other language? It's mostly a `hack" to do it
this way and use queue to pass "dssired execution context - specific and
implemented only in Java".

And it is nothing bad, to be perfectly honest, - this is fine and simple -
has less coupling (for example we can indeed immediately drop the whole
discussion which protocol we should use). But it looked like much more than
that during the dev call and in the discussion :)

> This is especially true for one of the use cases I had in my presentation
at Airflow Sumit where you have a separate team define the task, and you
just “call it like a function”. You as the dag author don’t need, and
shouldn’t know if it’s run in idk-8 or jdk-25.

Oh, absolutely—that makes total sense  - now after removing the Dag Parsing
part. I understand we simply want to simplify it now (comparing to what we
saw on the Dev Call as well?) - for the sake of "delivering it on time" and
focusing only on that part. I just want to be crystal clear about the
current scope :)


> Point 2: queue_to_sdk is per-worker deployment config. The scheduler
never sees or touches coordinator selection -- it only knows about queue in
the same way it always has. The CeleryKubernetesExecutor comparison doesn't
quite apply here because that was a scheduler/executor-level conflation;
here the scheduler is entirely unaware coordinators exist. Adding
coordinator="jdk-17" to @task.stub would require the scheduler to carry and
propagate that to the worker, which is exactly the coupling we're trying to
avoid. The coordinator choice is a deployment operator decision (not 100%
only, but mostly), not a DAG authoring decision.

That was very unclear. This also necessitates clearly defining which
configuration part each component uses. For 3.2, we merely mentioned in the
security model which items should be configured where. If we now introduce
different configurations for different components, I think that should also
be reflected in how we treat config. Likely, we need at least a
documentation split there, or most likely "airflow config" should have
options to generate, list etc. things per-component. And yeah, spelling it
out in the AIP would be great—even now it says "airflow.cfg" and does not
mention that it is configured specifically in each worker.

So as I understand it now, we aren't introducing a reusable coordinator
concept. Instead, tasks should specify which "*Java bridge*" (for example)
to use in a queue. The task stub should provide a way to pass which
specific Java bridge implementation to use if more than one exists. We
might use the queue similarly if we have another language bridge, but that
isn't certain. Is this a good statement?

*TP:*

On Mon, May 11, 2026 at 1:29 AM Tzu-ping Chung <[email protected]> wrote:

> The AIP has been updated to emphasise how to coordinator works with the
> (non-Python) SDK artefact, and that process management (if present) and
> message communication is not specified in the proposal since it is a
> contract only between the two.
>
> I also removed all references to parsing a Java dag file to avoid any
> potential conflicts to AIP-82. AIP-108 now only defines how the coordinator
> should find an executable target for a stub task, and the process it goes
> through to fulfil execution.
>

Note. There are still some references to parsing and jars. For example
there is "DAG File (Python or JAR)" - but if I understand correctly, we no
longer have "Jar Dags." We will still have purely Python DAGs, and we also
need to specify the "jar" file to use on the execution side. I see that we
are going to store the jar in a serialized DAG—so it should be specified
somewhere in "task.stub()" - am I right?
Currently the "Foreign Language Process" chapter seems to still describe
the case where the Dag is defined in Java, not in Python, and defers to the
internal "coordinator" on how to pass it. Since AIP-108 now only concerns
the "Java bridge," should we describe in detail how we specify the jar to
use? And in this case I assume it's up to worker to place the ".jar" there
- do we make any assumptions if the "."jar" files should be distributed via
shared Dag Bundles? Or some other ways? I think it's important to define
how "jar" files are distributed—especially in the context of GitDagBundle
and versioned Dag. It seems to be appropriate to include JAR files as part
of the "Dag Bundle/Git" and reference them using a relative path from the
"Dag Bundle" root. I think this part is a little vague, especially since
AIP-108 specifically concerns the JDK "Bridge" implementation. Details
cannot be deferred to the "implementation detail of each coordinator" -
because this is the only one we are discussing (no matter what we call it).
So I think it should be a bit more specific.



> I disagree this is a blocker from releasing the coordinator. Since a
> language-specific coordinator is its own distribution, it does not follow
> Airflow Core and SDK’s major release cycles. It can be made experimental,
> and/or bump the major version for breaking changes on its own.
>
>
Yeah, If the goal is not to define the "coordinator" concept that will be
implemented in other languages as well - but specifically about the Java
Bridge—then I absolutely agree. But then introducing the concept of a new
"coordinator" distribution type is a bit far-fetched. It seems more like a
"task-sdk" Python -to-Java bridge that should be an optional "task-sdk"
feature. Alternatively, we could make it a separate extension, perhaps
named "airflow-sdk-java-bridge" because it essentially becomes the
"Task-sdk Java bridge". And a side comment: it looks like our "task-sdk"
will not have the use we anticipated. This "Task-SDK bridge," along with
all the "decoupling" we've done, isn't really used. Essentially, we will
have to hard-pin the Bridge's implementation to the task-sdk version
because we are not defining any API level compatibility mechanism. Even if
we separate it from "task-sdk", we will always have to pin those two
distributions together (unless we provide cadwyn-like compatibility for
task-sdk). So I woudl rather opt for "built-in" feature in "task-sdk" not
separate distribution I think.


> I’m not sure where the “first matching” understanding is from. (Please
> point out relevant text on this; it is most likely out-of-date.) A task
> specifies a queue, and then the queue is mapped to a specific coordinator.
> There can’t be multiple to choose from. You can map multiple queues to use
> the same coordinator, but only one coordinator can be assigned to a queue.
>

>From this (now removed):

        ┌──────────────▼───────────────┐
 ┌──────────────────────────────┐
    │  DAG File Processor          │                    │  Runtime
Subprocess (Java)   │
    │                              │  can_handle_dag    │
           │
    │  For each file in bundle:    │  _file() == True   │
 dag_parsing_cmd()           │
    │  ┌ coordinator handles it? ──┼───────────────────►│
           │
    │  │  Yes ──► delegate parse   │                    │  Java SDK parses
JAR, builds │
    │  │  No  ──► Python path      │  SDK Serialized    │  SDK-compatible
Serialized   │
    │  │                           │◄─── DAG JSON ──────┤  DAG JSON (sdk,
tasks, etc.) │
    │  └                           │                    │
           │
    └──────────────┬───────────────┘
 └──────────────────────────────┘

Since we can have multiple coordinators (as noted in even existing example
below - sometimes multiple coordinators handle it.java), the first
coordinator that returns "can_handle_dags" will win. (That was on Saturday
when I reviewed it)
And I understand why it was done that way—because, obviously, you would
have to create a method to determine which coordinator should handle which
files without hard-coding it. But since in this version—but since we
removed that whole section, and we only explicitly rely on choosing
coordinator based on what "task" defines - and can use explicitly
"coordinator name"—this whole part is gone now.
But yes. Since this part is removed, it is no longer a concern.


*4. JVM process model -> *unlike in Python, we do not know if we are
> running new processes or running tasks within the JVM process. This has
> implications for memory, startup time overhead, JIT, isolation (especially
> for multi-team environments), and restart-on-failure. I think we should
> spell out the proposal clearly. Possibly (if my queue/coordinator split is
> accepted), we could use the queue to determine the task's execution mode.
>
>
> I think this are implementation aspects. Managing a JVM process is no
> different from Python, and all the characteristics that a Python task
> possess apply directly to a Java one. There are really no differences that
> need specifications here. (Maybe this point was based on the understanding
> of the next point where Java tasks can reuse the same process? But we’re
> not doing that.)
>

Hmm. I initially understood (with the coordinator concept) that we should
at least know how it will look in different cases for different
executors—specifically, whether a Python process runs under supervisor or
not. We already do this in Python. For example, the GoSDK (initially, at
least - not sure now) used the idea of a long running Go process that ran
tasks as subroutines (as far as I remember) - and it did not start a new
process for every task. Here it seems (and maybe I am wrong) what we are
proposing is that Python "supervisor" will start a Java process separately
for each task. Which is fine. As long as we are explicit about that. I see
three models here that might work:

a) Python process (started by any executor—local executor process pool,
forked Celery worker, or Python edge—the method is the same as today)
starts a new Java process for every task. Both are running and taking
memory and starting the Java Interpreter is an additional "startup"
overhead.
b) New Java process(es) start via the workers (either the Scheduler for the
Local Executor, a Celery worker, or the Edge Executor), and the executors
communicate with this Java process without needing to create a new Python
interpreter. That would require changes in each executor but results in
less "per task" overhead, no need to create a Python Interpreter and less
isolation (unless we can use some built-in Java isolation mechanism)
c) A long-running Java process that implements the Edge Executor interface
and starts tasks using whatever provides good isolation in Java.

But by simplifying it to merely the Java-specific "Task-SDK Bridge," I
understand we are talking about* a) only. - *Neither b) nor c) has been
considered at all. I think it would be great to confirm and spell it out,
as it has very concrete performance implications. When we discussed
multiple language support we cited both "Native API" and
"Performance/Overhead" as reasons to implement new languages. This
implementation essentially only addresses "Native API," in addition to the
Task SDK we already have. The entire TaskSDK premise was that it could be
implemented natively in any language. It might still be used this way, but
I understand it's not a goal of this AI any more.

*6. Multi-team: *I guess coordinator definition should not be shared
> between teams (unless they want it). The current proposal puts them all in
> single ,cfg but Multi-team has this format: Which I think the coordinators
> should also follow?
>
>
> [core]
> executor = GlobalExecutor;team1=Team1Executor;team2=Team2Executor
>
>
> The executor config is formatted this way since it is a default value, and
> each team can have a different default. The coordinators config specifies
> what should exist in Airflow instead, and each team should choose which
> coordinator they want to use (by specifying the queues their tasks use). It
> is not necessary (and honestly IMO extremely confusing in practice) if each
> team can have a different set of coordinators.
>
>
With the explanation that we are switching only to "Task-SDK Java Bridge"
then it does not need that indeed. But I think we should be explicit that
it's a worker-only configuration.



> The task (user code) simply print to stdout and stderr, and it gets hurled
> to the parent process and recorded with standard Airflow logging. How the
> messages are actually print is up to the user. Or you can do your own
> logging—we don’t care. (This is actually the same in Python; Airflow itself
> uses structured logging, but user-generated task logs are just strings;
> Airflow does not enforce a format there.)
>

Hmm. I do not know the details but as far as I know, we currently use our
own more complex interface between the Python task and the supervisor that
we implemented. As far as I remember it was StructLog with
newline-separated JSON (but I might be wrong). Some of the issues we
encountered were that stderr/stdout is too brittle to rely on without
structure (regarding flushing and similar). I would love others who know
more to chime in here - but I do not think we use stdout/stderr there; it's
far more sophisticated. Since the JDK has its own logging "standard"
mechanism, it would make sense to tap into it and use the same interface, I
guess. We know from the past that relying on stdout/stderr is generally a
bad idea. But Maybe I am wrong about that. I think this requires
clarification and consultation regarding the past stdin/stderr errors.


Tracing is currently not implemented, but IMO this is more of a standalone
> feature on the Java SDK,  The Java SDK can simply be configured to send to
> the same OT endpoint, and we can probably build some utility functions to
> simply things for task authors, but ultimately there’s not really anything
> to specify in the AIP since Airflow doesn’t read tracing information (it
> only emits).
>

I think that's a fine (and good) approach to let people configure
OpenTelemetry (OTEL) in the regular way they configure other Java
processes. As long as we clearly document this and pass sufficient
information when executing the task (e.g., some OTEL span-ids), I believe
that should be enough for the standalone Java process to properly emit its
metrics and traces. It's likely just agreement on the interface - what
should be passed to Java to make it possible, and whether any additional
exchange via Task-SDK (doubtful) is needed. We likely need to tell people
how to handle this. When each Java task is a separate Java interpreter, it
will very likely "just work" as OTEL libraries should handle the "soft" and
"harder" (signal-based) killing. Speaking of which, it's also a question
for OpenLineage: whether we need to do something similar there. In the
past, there were various issues with buffering OTEL and Open Lineage
information and prematurely killing processes, so including the
OTEL/Lineage people in the discussion is likely a good idea.

Sorry for being such pain in the neck - I thin it's good we are simplifying
things as a result of this discussions and narrowing the scope - but I thin
there are still some important questions ^^ to solve.

J.


>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>

Reply via email to