The AIP has been updated to emphasise how to coordinator works with the
(non-Python) SDK artefact, and that process management (if present) and message
communication is not specified in the proposal since it is a contract only
between the two.
I also removed all references to parsing a Java dag file to avoid any potential
conflicts to AIP-82. AIP-108 now only defines how the coordinator should find
an executable target for a stub task, and the process it goes through to fulfil
execution.
Moving to other topics mentioned previously…
>
> 1. The important topic already discussed is the wire protocol. While I agree
> it's not a "blocker" for this AIP, it's 100% a blocker for releasing the
> coordinator. Until we ship the first version of the coordinator we can
> **easily** change the wire protocol. Once we ship it, it's basically a done
> deal and it will be difficult to take a different path. But I do not see it
> as a blocker for this AIP. I saw early versions (not public yet—I saw
> messages from Confluence and read some early drafts) showing that the current
> choice was a well-thought-out decision. I have my own sentiments and
> experience with protobufs—they're pretty bad, and I really wouldn't like to
> use them here—but I agree this is a different discussion and shouldn't
> prevent acceptance of this AIP. This is contingent on us agreeing that we
> will not ship the coordinator until AIP-109 (which I saw was the proposal) is
> discussed and agreed upon. That discussion should happen **after** we
> complete this one and vote on this AIP; otherwise, it might get stuck in
> endless bikeshedding. I'm glad the proposal exists, and I even more glad it's
> now gated by permissions (I saw that TP, Ash, and Jason did that). Once we
> accept this one, we will open AIP-109 and discuss and agree on it well before
> we actually "ship" the first coordinator.
I disagree this is a blocker from releasing the coordinator. Since a
language-specific coordinator is its own distribution, it does not follow
Airflow Core and SDK’s major release cycles. It can be made experimental,
and/or bump the major version for breaking changes on its own.
However, in the meantime, some changes in Airflow Core and SDK are needed for
them to know the coordinator exists and route workload to it. The best approach
IMO is to release the base coordinator interface in Airflow 3.2 with a
provisional Java SDK and coordinator implementation (maybe using a
zero-prefixed version). This would be most helpful for us to understand
potential issues, rather than trying to make decisions in a vacuum.
> 3. First match coordinator choice: Choosing the first matching coordinator is
> I think, a bad choice (paired with the queue_to_coordinator mapping). It is
> simplistic and has potential problems, such as when "similar" coordinators
> effectively "compete" for Dags/Tasks. Even the very example of two "jdk"
> coordinators shows the problem. I can very easily see say "jdk-11" and
> "jdk-17" coordinators configured in the same setup. The sequence of setting
> those coordinators will determine whch one is used, which shows a bit of
> asymmetry. If you want to use "jdk-17", you must specify it, rather than
> having it matched automatically. However, I can easily imagine a slightly
> more complicated interface where the matcher would return a tuple -
> ("matches": bool, "priority": int) + some deterministic tie-break or conflict
> error if we have two identical matches. This could be a much more flexible
> solution. It would handle a much more versatile set of cases. For example,
> you could have "jdk-11/" and "jdk-17" folders in a bundle and have two
> coordinators pick which one handles the dags based on the folder name -
> without needing to specify the coordinator by "queue" or "name. In my
> opinion, that is a much better solution than the one the
> "queue_to_coordinator" tried to solve by proposing an indirection layer in
> the Dag definition. Performance-wise, the impact should be negligible. I
> hardly imagine we'd have to worry about more than a few coordinators and the
> complexity of the "can_handle_dag" implementation, which would involve always
> running "can_handle_dag" for all coordinators rather than only when the last
> coordinator matches.
I’m not sure where the “first matching” understanding is from. (Please point
out relevant text on this; it is most likely out-of-date.) A task specifies a
queue, and then the queue is mapped to a specific coordinator. There can’t be
multiple to choose from. You can map multiple queues to use the same
coordinator, but only one coordinator can be assigned to a queue.
> 4. JVM process model -> unlike in Python, we do not know if we are running
> new processes or running tasks within the JVM process. This has implications
> for memory, startup time overhead, JIT, isolation (especially for multi-team
> environments), and restart-on-failure. I think we should spell out the
> proposal clearly. Possibly (if my queue/coordinator split is accepted), we
> could use the queue to determine the task's execution mode.
I think this are implementation aspects. Managing a JVM process is no different
from Python, and all the characteristics that a Python task possess apply
directly to a Java one. There are really no differences that need
specifications here. (Maybe this point was based on the understanding of the
next point where Java tasks can reuse the same process? But we’re not doing
that.)
> 6. Multi-team: I guess coordinator definition should not be shared between
> teams (unless they want it). The current proposal puts them all in single
> ,cfg but Multi-team has this format: Which I think the coordinators should
> also follow?
>
> [core]
> executor = GlobalExecutor;team1=Team1Executor;team2=Team2Executor
The executor config is formatted this way since it is a default value, and each
team can have a different default. The coordinators config specifies what
should exist in Airflow instead, and each team should choose which coordinator
they want to use (by specifying the queues their tasks use). It is not
necessary (and honestly IMO extremely confusing in practice) if each team can
have a different set of coordinators.
> 7. How will logging / tracing be handled? - Currently there is no mention of
> it in the AIP, I am sure it has already been considered and resolved
> (including wire protocol and Java logger integration). But it's not specified
> in the design - and I think describing it is quite important. Since we are
> using StructLog, we want to use a similar tool in Java and integrate it with
> existing struct-logging - but it's not clear how. Java has its own logging
> system and plenty of logging "wrappers" - but here I think we have similar
> problems with "wire protocol." Which one are we going to use? Where do we
> convert what Java produces into StructLog's JSON? On which side does this
> conversion happen? Will the wire protocol be the same for all languages?
> Similar to the wire protocol above, details and choices might be discussed in
> a separate AIP. However, at least the "Java choices" regarding logging and
> how we envision communication for logs should be documented. Similarly with
> OpenTelemetry (OTEL) tracing: perhaps we should skip it for now and assume no
> tracing exists. More likely, however, we will integrate the Java side with
> the Java OTEL implementation and pass enough information to the task so it
> can provide sufficient tracing information via its own OTEL configuration. ?
> I think we should spell out the choices we made at the very least here.
The task (user code) simply print to stdout and stderr, and it gets hurled to
the parent process and recorded with standard Airflow logging. How the messages
are actually print is up to the user. Or you can do your own logging—we don’t
care. (This is actually the same in Python; Airflow itself uses structured
logging, but user-generated task logs are just strings; Airflow does not
enforce a format there.)
The Java SDK actually has a separate channel to send internal logs so they are
not mixed with user logs, but again this is specific to the coordinator-SDK
combination and doesn’t need to be specified. Each coordinator can do this in
its own way best fit from the SDK.
Tracing is currently not implemented, but IMO this is more of a standalone
feature on the Java SDK, The Java SDK can simply be configured to send to the
same OT endpoint, and we can probably build some utility functions to simply
things for task authors, but ultimately there’s not really anything to specify
in the AIP since Airflow doesn’t read tracing information (it only emits).
TP