First of all, changing "coordinator" to "bridge," which I proposed is not
something I strongly favor; it simply allows me to build a better mental
model and understand it better (because I now see it really as a "Java
bridge". I will use "coordinator/bridge" later; we can decide on the actual
name and packaging approach once we fully understand the scope.
>
> There is still a concept of parsing in JARs since we still need a
> mechanism for the Java code to define which task is being implemented. This
> is still called Dag and Task in the Java SDK to match the terminology in
> Python. But Airflow does not find Airflow dags in JARs. I’ll edit out the
> DAG File reference you mentioned (and some other use of “DAG parsing” to
> avoid confusion since this is not the same concept in the Airflow dag
> processor sense).
>
Yeah. - but - if I understand correctly, the "coordinator/bridge" simply
passes the "context" and "task" information to run. If I read it correctly,
from Airflow's perspective, the "runner" simply starts the jar and tells it
to "execute that task." The JAR contains all the code necessary to map the
task name to the actual class that should run. If I understand correctly,
the Scheduler, Parser, and even the Python worker running on any executor
know nothing about the mapping of classes and task names to the specific
Java class to run. How this code works, how the bridge works, and how it
passes task-SDK messages are purely implementation details of the "Java
Bridge." Do we have no plans to use this same architecture for other
languages now? It **may** happen later, but this is not the goal for now?
Does the presence of "Dag" and "Task" in Java merely serve to identify
which class should run the task?
Is that the correct understanding?
The JAR is not stored in a serialised dag (nor does a JAR contain
> serialised dags). It is configured in the worker environment. (JARs are
> similar to executing Python packages; the 'java' command knows where to
> look, or you can specify it in the coordinator with arguments.)
>
I was referring more to the JAR path, not the JAR itself? In the previous
(now removed) version of the AIP we had this:
┌──────────────▼───────────────┐
│ Metadata DB │
│ │
│ serialized_dag: { │ Stored as-is from the language
runtime's
│ "relative_fileloc": │ SDK Serialized DAG JSON
│ "path/to/example.jar" │
│ } │
│ task_instance.queue │
└──────────────┬───────────────┘
So my question about the JAR path stemmed from that. But I understand now,
it's gone entirely. So I understand now that we completely remove any JAR
reference and decouple from how compiled Java classes are made available to
the "java" command executed by the subprocess.Popen (or whatever way we
call it from the Python coordinator/bridge code?
>
> The above only happens on execution time, so the JARs (technically)
> doesn’t need to be in the dag bundle, only somewhere in the execution
> environment that the coordinator can locate. In practice maybe it should be
> included in the bundle since the implementation affects dag versioning, but
> on the other hand a JAR may be difficult to version since it is
> fundamentally a derived binary (technically an extension to ZIP).
> Personally I think we need some real world user experience to know the best
> approach here, so the AIP does not provide an opinion on this topic. The
> best practice needs to be researched and codified later.
>
So, does this mean users will have to distribute and share their Java code
completely independently from what they are used to with DAGs now ? I think
this should be explicitly spelled out as a design decision because it's not
something our users are used to. It's more akin to distributing packaged
plugins currently, and I think we need to help our users map this into
their mental model. So far, users had the "DAG" in a "bundle," and
everything they needed to run was in that "bundle". Now they will need both
"Python" code for the Dag in the bundle and "compiled Java JARs" elsewhere,
distributed differently during deployment. I have no problem with that as
long as we clearly define it as a deployment model change for Dags.
> I don’t agree with putting the package under the airflow.sdk namespace
> since it complicates distribution a lot down the road. We already have
> interest for other languages, and the airflow.sdk namespace already
> contains a lot of Python SDK things as-is, it is not a good idea to also
> open it up for potential language runtimes. Furthermore, the coordinator
> layer is not a part of the DAG authoring interface, which airflow.sdk is
> supposed to contain. You could maybe argue it can go into a nested package
> such as airflow.sdk.execution_time.coordinators, but that is just too
> unnecessarily long. A separate package i.e. airflow.coordinators is the way
> to go in my opinion.
>
To be honest, the Python package name is secondary. I think more important
is the question of compatibility and related coupling - and question if
increasing operational complexity (by introducing new types of
distributions **today** as opposed to **when needed** is justified). We
should first agree on the relation between "task-sdk" and "java
coordinator/bridge" code before deciding on "distribution names" and
"python packages". Let me simplify it with an example:
* task-sdk 2.1 <-> bridge/coordinator 1.1
* task-sdk 2.2 <-> bridge/coordinator 1.2 (is 1.1 possible as well)
What's the relation? Do we assume that the bridge/coordinator and the
task-sdk will always need to be released and pinned together?
Or should we handle the situation where "task-sdk 2.2" is used with
"bridge/coordinator 1.1," or perhaps where "task-sdk 2.1" is used with
"bridge/coordinator 1.2"?
In other words - how strongly coupled bridge/coordinator is with task/sdk ?
Do we have a compatibility mechanism - or (for now) do we assume we always
release them in sync and pin with ==? Or do we have any plans to do it
(like we do with Cadwyn on airflow-core <> task.sdk.
I am not against separate distributions (as you might be aware); I just
want to understand what coupling we have and whether splitting it out
offers any benefits. Yes, it might make task-sdk smaller, but not by much.
We are only saving a bit of disk space for unused Python classes if there
is such a strong coupling and no plans to decouple it in the mid-term. And
we save the operational complexity of having to build and release even more
- different kinds of packages.
Also quoting your own:
> The best practice needs to be researched and codified later.
Precisely - by embedding the "coordinator/bridge" in tasks-sdk allows us to
do that. Without introducing new concepts that are not yet well defined.
Unless we want to build compatibility and allow installing different
versions of those distributions, there is virtually no benefit to
splitting, IMHO. We can still gain the same learning and see what works,
what we need to fix much faster - with fewer distributions and less
operational complexity. We can then implement the "coordinator" concept
learning from that experience while already handling multiple languages
(which can also be added more simply by following the same "embedded"
way). I often have this saying: "In order to make things reusable, we have
to make them usable first." This is a classic example of it—we seem to be
trying to implement a new concept with full-blown separate distributions,
naming etc. without actually knowing how it will work, especially when
generalized to different languages.
This is exactly what was done "right" with providers: we kept them in the
main "apache-airflow" package as a single monolith distribution - for quite
some time, only separating them to "provider distributions" after the
concept matured and we had many providers in the single distribution. I
have a feeling that in this case we are trying too hard to generalize
things that we do not even have yet. That was done exactly when we figured
out how to decouple them, what compatibility we would provide, and how.
We can make it "airflow.sdk._bridge.jdk" and call it a day. And refactor it
later in coordinator distribution when we also have
"airflow.sdk._bridge.golang," "airflow."sdk._bridge.typescript" - and we
can change it all at any time in the future, even changing the name,
etc.—because this will be coupled with the "sdk" version anyway, and it's
an internal detail for the users.
> The supervisor process does use structured logging, but logs emitted from
> USER CODE is basically plain string. Technically it’s wrapped in a JSON to
> include some extra information (file location, stdout or stderr, etc.) but
> ultimately the user can only emit plain strings to Airflow.
> Java has its own logging architecture, but Airflow can only care about
> what Airflow understands. We can discuss what the best practice should be
> when you write Java tasks, but this AIP shouldn’t go beyond stdout and
> stderr streams.
>
I think we need the same source code/location information from day one for
operationalizing it. However, that does not change the concept and can be
seen as an implementation detail. Adding a Java logger that emits json in
the same format as the Python code should be **easy**. I am simply afraid
we are trying to rediscover the same buffering stdin/stdout issues that
caused us to change stdio/stderr to line-delimited JSON in
"task/supervisor". We had, I believe, very good reasons for doing it. And I
think it does not add more complexity. But yes - this might also be seen as
an internal detail of the "Java bridge" implementation so I agree it does
not have to be part of the AIP.
>
> Even in the pure Python scenario, the supervisor layer does not receive
> OTel messages; they are sent directly from the worker process. The question
> would be entirely about the interface, which we can always add later.
>
Correct. I agree it's not something the bridge should "handle." However I
think the "worker" process receives some information from the supervisor
(span IDs?..) that allow the emitted metrics to be properly matched with
spans. So bridge is more of a "pass through". I think all the AIP needs is
to state, "Yes, we thought about it; we will pass all the necessary
information for OTEL to work well and here are the items we are passing:
......".
>
> OL is mostly not an issue since Airflow’s integration is mostly at the
> operator and hook level, but the Java SDK does not hav those concepts. We
> can introduce interfaces to help user emit their own lineage info from Java
> code, but again that should be added later.
>
Similarly, I'm unsure if OpenLineage requires specific metadata to be
passed as all necessary information might already be present in the context
sent through the bridge. All I'm saying is we should talk to Maciej and
Kacper and drop a paragraph of what is needed (or explicitly state that we
considered it and here is the answer.
>
>
> >
> > Sorry for being such pain in the neck - I thin it's good we are
> simplifying
> > things as a result of this discussions and narrowing the scope - but I
> thin
> > there are still some important questions ^^ to solve.
> >
> > J.
> >
> >
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: [email protected]
> >> For additional commands, e-mail: [email protected]
> >>
> >>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>