uranusjr commented on code in PR #65956:
URL: https://github.com/apache/airflow/pull/65956#discussion_r3283976998


##########
java-sdk/adr/0001-java-sdk-airflow-integration.md:
##########
@@ -0,0 +1,343 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+# ADR-0001: Java SDK Airflow Integration
+
+## Status
+
+Accepted
+
+## Context
+
+Airflow's current execution model is Python-only: DAGs are Python files, tasks 
are Python callables, and the task runner forks a Python process. To support 
DAGs and tasks authored in other languages (starting with Java), we need an 
architecture that:
+
+- Allows entire DAGs to be written in a non-Python language (pure Java DAG).
+- Allows non-Python tasks to coexist with Python tasks in the same DAG 
(`@task.stub`).
+- Reuses the existing task-runner two-layer design (task-runner process + 
forked child process) so Airflow extensions (XCom backends, connections, 
variables) stay in Python.
+- Is extensible to other languages (Go, Rust, etc.) without per-language 
changes to Airflow Core.
+
+The existing task runner already uses a two-layer design. When an executor 
wants to run a task, it starts a task-runner process that talks to Airflow Core 
through the Execution API, and forks another process that talks to the 
task-runner through TCP to run the actual task code. All the Airflow extensions 
simply go into the task-runner process, keeping them in Python.
+
+The only thing missing is a way for the task-runner process to run tasks in 
another language.
+
+## Decision
+
+### Writing a Non-Python Task
+
+There is one way to write a non-Python task: implement the language SDK's task 
interface. For Java, this is the `Task` interface with a single `execute(Client 
client)` method. The `Client` provides access to Airflow services (connections, 
variables, XCom).
+
+### Two Ways to Integrate Non-Python Tasks into a DAG
+
+We provide two approaches for integrating non-Python tasks into a DAG:
+
+**a) Pure Java DAG** — define the entire DAG in Java, with no Python file at 
all.
+The Java SDK provides `DagBundle`, `Dag`, and `Task` interfaces:
+
+```java
+public class JavaExample implements DagBundle {
+
+  public static class Extract implements Task {
+    public void execute(Client client) throws Exception {
+      var connection = client.getConnection("test_http");
+      client.setXCom(new Date().getTime());
+    }
+  }
+
+  public static class Transform implements Task {
+    public void execute(Client client) {
+      var extract_xcom = client.getXCom("extract");
+      client.setXCom(new Date().getTime());
+    }
+  }
+
+  @Override
+  public List<Dag> getDags() {
+    var dag = new Dag("java_example", null, "@daily");
+    dag.addTask("extract", Extract.class, List.of());
+    dag.addTask("transform", Transform.class, List.of("extract"));
+    return List.of(dag);
+  }
+}
+```
+
+**b) `@task.stub` in a Python DAG** — for mixed-language pipelines where 
Python and
+Java tasks coexist in the same DAG. The `@task.stub` syntax is already 
supported for
+the Go SDK; the same pattern applies to Java:
+
+```python
+@task()
+def python_task_1(ti):
+    ti.xcom_push(value="from-python", key="return_value")
+
+
[email protected](queue="java")
+def extract(): ...
+
+
[email protected](queue="java")
+def transform(): ...
+
+
+@dag(dag_id="java_example")
+def simple_dag():
+    python_task_1() >> extract() >> transform()
+```
+
+Both approaches are supported in parallel. A pure Java DAG needs no Python at 
all for authoring. A `@task.stub` DAG requires a Python file but lets you mix 
Python operators and non-Python tasks in a single pipeline.
+
+> **Note:** The current `DagBundle` interface used in pure Java DAGs is 
subject to review before the SDK reaches 1.0. Subclassing `Dag` directly may be 
a more natural fit and is being considered for post-OSS-integration.
+
+### The Coordinator Layer
+
+We introduce a **Coordinator** layer. When a DAG bundle is loaded, it not only 
tells Airflow how to find the DAGs (and tasks in them), but also how to *run* 
each task. Current Python tasks use a Python code path that runs them by 
forking. A new **Java Coordinator** instructs the task runner how to run tasks 
in JAR files.
+
+The base interface (`BaseCoordinator`) lives in `airflow.sdk.execution_time` 
and is selected automatically via `ProvidersManagerTaskRuntime`. The Java 
Coordinator lives in a provider under the `airflow.providers.sdk.java` 
namespace, and new language coordinators follow the same pattern.
+
+### Architecture Overview
+
+```
+            Airflow Backend                           Language Runtime 
Subprocess (Java in this example)
+            ───────────────                           
──────────────────────────────────────────────────
+
+    ┌──────────────────────────────┐
+    │  DAG File (Python or JAR)    │
+    │                              │
+    │  @task.stub(queue="java")    │
+    │  def my_java_task():         │
+    │      ...                     │
+    └──────────────┬───────────────┘
+                   │
+    ┌──────────────▼───────────────┐                    
┌──────────────────────────────┐
+    │  DAG File Processor          │                    │  Runtime Subprocess 
(Java)   │
+    │                              │  can_handle_dag    │                      
        │
+    │  For each file in bundle:    │  _file() == True   │  dag_parsing_cmd()   
        │
+    │  ┌ coordinator handles it? ──┼───────────────────►│                      
        │
+    │  │  Yes ──► delegate parse   │                    │  Java SDK parses 
JAR, builds │
+    │  │  No  ──► Python path      │  SDK Serialized    │  SDK-compatible 
Serialized   │
+    │  │                           │◄─── DAG JSON ──────┤  DAG JSON (sdk, 
tasks, etc.) │
+    │  └                           │                    │                      
        │
+    └──────────────┬───────────────┘                    
└──────────────────────────────┘
+                   │
+    ┌──────────────▼───────────────┐
+    │  Metadata DB                 │
+    │                              │
+    │  serialized_dag: {           │  Stored as-is from the language runtime's
+    │    "relative_fileloc":       │  SDK Serialized DAG JSON
+    │       "path/to/example.jar"  │
+    │  }                           │
+    │  task_instance.queue         │
+    └──────────────┬───────────────┘
+                   │
+    ┌──────────────▼───────────────┐
+    │  Scheduler                   │
+    │                              │
+    │  Reads queue from TI         │
+    │  ──► ExecuteTask workload    │
+    │      (includes queue)        │
+    └──────────────┬───────────────┘
+                   │
+    ┌──────────────▼───────────────┐                    
┌──────────────────────────────┐
+    │  Execution API               │                    │  Runtime Subprocess 
(Java)   │
+    │                              │                    │                      
        │
+    │  TI.queue ──► Startup        │                    │  
task_execution_cmd()        │
+    │                   Details    │                    │  Executes task in 
JVM        │
+    └──────────────┬───────────────┘                    │                      
        │
+                   │                                    
└──────────────▲───────────────┘
+    ┌──────────────▼───────────────┐                                   │
+    │  Task Runner                 │                                   │
+    │                              │                                   │
+    │  QueueToCoordinatorMapper    │                                   │
+    │  maps queue via `[sdk]       │                                   │
+    │  queue_to_sdk` config ───────┼───────────────────────────────────┘
+    │  to matching coordinator     │
+    └──────────────────────────────┘
+```
+
+### The `BaseCoordinator` Interface
+
+This is the central abstraction that language providers implement. It lives in 
the Task SDK (`task-sdk/src/airflow/sdk/execution_time/coordinator.py`) and 
handles both DAG parsing and task execution for a specific language runtime.
+
+```python
+class BaseCoordinator:
+    """
+    Base coordinator for runtime-specific DAG file processing and task 
execution.
+
+    Providers register subclasses in their ``provider.yaml`` under
+    ``coordinators``. Both ProvidersManager (airflow-core) and
+    ProvidersManagerTaskRuntime (task-sdk) discover coordinators through
+    this extension point.
+
+    Subclasses represent a specific language runtime (Java, Go, etc.) and
+    implement three methods. The base class owns the full bridge lifecycle:
+    TCP servers, subprocess management, selector-based I/O loop, and cleanup.
+    """
+
+    sdk: str  # e.g. "java", "go" — matches sdk field on operator/TI
+
+    # Discovery (called by DAG File Processor)
+
+    @classmethod
+    def can_handle_dag_file(cls, bundle_name: str, path: str | os.PathLike) -> 
bool:
+        """Return True if this coordinator should parse the file at *path*."""
+        ...
+
+    @classmethod
+    def get_code_from_file(cls, fileloc: str) -> str:
+        """Return the actual DAG code (the content of JavaExample.java in this 
case"""
+        ...
+
+    # DAG Parsing (called in forked DagFileProcessor child process)
+
+    @classmethod
+    def dag_parsing_cmd(
+        cls,
+        *,
+        dag_file_path: str,  # Absolute path to DAG file
+        bundle_name: str,  # Name of the DAG bundle
+        bundle_path: str,  # Root path of the bundle
+        comm_addr: str,  # host:port for msgpack comm channel
+        logs_addr: str,  # host:port for structured JSON log channel
+    ) -> list[str]:
+        """Return the subprocess command for DAG file parsing."""

Review Comment:
   Deferred until later since we dropped dag parsing entirely from AIP-108.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to