Hi all,

As mentioned in the latest dev call, we have been developing a Java SDK with 
changes to Airflow in a separate fork[1]. We plan to start merging the Java SDK 
work back into the OSS repository.

We see this as a natural step following initial work in AIP-72[2], which 
created “a clean language agnostic interface for task execution, with support 
for multiple language bindings” (quoted from the proposal).

The Java SDK also uses Ash’s addition of @task.stub[3] for the Go SDK, to 
declare a task in a DAG to be “implemented elsewhere” (not in the annotated 
function). Similar to the Go SDK, we also created a Java library that users can 
use to write task implementations for Airflow to execute at runtime.

[1]: https://github.com/astronomer/airflow/tree/feature/java-all
[2]: https://cwiki.apache.org/confluence/x/xgmTEg
[3]: https://github.com/apache/airflow/pull/56055

The user-facing syntax for a stub task would be the same as implemented by the 
Go SDK:

    @task.stub(queue="java-tasks")
    def my_task(): ...

With a new configuration option to map tasks in a pool to be executed by a 
specific SDK:

    [sdk]
    queue_to_sdk = {"java-tasks": "java"}

The configuration is needed for some executors the Go SDK currently does not 
support. The Go SDK currently relies on each executor worker process to specify 
which queues they listen to, but this is not always viable, since some 
executors—LocalExecutor, for example—do not have the concept of worker 
processes.

The Coordinator Layer
=====================

When the Go SDK was implemented, it left out Runtime Airflow plugins as a 
future topic. This includes custom XCom backends, secrets backends lookup for 
connections and variables, etc. These components are implemented in Python, and 
a Java task cannot easily use the feature unless we also implement the lookup 
logic in Java. We don’t want to do that since it introduces significant 
overhead to writing plugins, and the overhead multiplies with each new language 
SDK.

Fortunately, the current execution-time task runner already uses a two-layer 
design. When an executor wants to run a task, it starts a (Python) task runner 
process that talks to Airflow Core through the Execution API, and *forks* 
another (Python) process, which talks to the task runner through TCP, to run 
the actual task code. Airflow plugins simply go into the task runner process.

This design works well for us since it keeps all the Airflow plugins in Python. 
The only thing missing is an abstraction for the task runner process to run 
tasks in any language. We are calling this new layer the **Coordinator**.

When a DAG bundle is loaded, it not only tells Airflow how to find the DAGs 
(and the tasks in them), but also how to *run* each task. Current Python tasks 
use the Python Coordinator, running tasks by forking as previously described. A 
new JVM Coordinator will instruct the task runner how to run tasks packaged in 
JAR files.

Each coordinator implements a base interface (BaseRuntimeCoordinator) that 
handles three concerns:

- Discovery: determining whether a given file belongs to this coordinator (e.g. 
JAR files for Java).
- DAG parsing: returning a runtime-specific subprocess command to parse DAG 
files in the target language.
- Task execution: returning a runtime-specific subprocess command to execute 
tasks in the target runtime.

The base class owns the full bridge lifecycle—TCP servers, subprocess 
management, and cleanup—so language providers only need to implement these 
three methods.

The coordinator translates a DagFileParseRequest (for DAG parsing) and 
StartupDetails (for Task execution) data model (as declared in Airflow) into 
the appropriate commands for the target runtime. For example, a “java 
-classpath ... /path/to/MainClass ...” subprocess command that points to the 
correct JAR file and main class in this case.

Coordinators as Airflow Providers
=================================

The base coordinator interface and the Python coordinator will live in 
“airflow.sdk.execution_time”. Other coordinators (for foreign languages) are 
registered through the existing Airflow provider mechanism. Each SDK provider 
declares its coordinator in its provider.yaml under a “coordinators” extension 
point. Both ProvidersManager (airflow-core) and ProvidersManagerTaskRuntime 
(task-sdk) discover coordinators through this extension point. This means 
adding a new language runtime requires only a provider package. No changes to 
Airflow Core are needed.

The new JVM-based coordinator will live in the namespace 
“airflow.providers.sdk.java”. This is not the most accurate name (technically 
it should be “jvm” instead), but in practice most users will recognize it, and 
(from my understanding) other JVM language users (e.g. Kotlin, Scala) are 
already well-versed enough dealing with Java interoperability to understand 
“java” means JVM in this context.

Writing DAGs in Java
====================

This is not strictly connected to AIP-72, but considered by us as a natural 
next step since we can now implement tasks in a foreign language. Being able to 
define the DAG in the same language as the task implementation is useful since 
writing Python, even if only with minimal syntax, is still a hurdle for those 
not already familiar with, or even allowed to run it. There are mainly three 
things we need on top of the task implementation interface:

- DAG flags (e.g. schedule, max_active_tasks)
- Task flags (e.g. trigger_rule, weight_rule)
- Task dependencies

A proof-of-concept implementation is included with other changes proposed 
elsewhere in this document.

Lazy Consensus Topics
=====================

We’re calling for lazy consensus for the following topics

- A new “queue_to_sdk” configuration option to route tasks to a specific 
language SDK
- A new coordinator layer in the SDK to route implementations at execution time.
- New providers under airflow.providers.sdk to provide additional language 
support.
- Develop the Go SDK to support the proposed model and a provider package for 
the coordinator. (Existing features stay as-is; no breaking changes.)
- Add the new Java SDK and the corresponding provider package.

TP




---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to