kaxil commented on code in PR #65958: URL: https://github.com/apache/airflow/pull/65958#discussion_r3307753465
########## scripts/in_container/java_sdk_setup.sh: ########## @@ -0,0 +1,71 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + + +# 1. Check Java +check_java() { + local java_bin="/files/openjdk/bin/java" + local version_output + + # First check if the locally installed OpenJDK exists and works. + if [ -x "$java_bin" ] && version_output=$("$java_bin" -version 2>&1); then + echo "Found existing OpenJDK at $java_bin. OK." + return + fi + + # On macOS, /usr/bin/java exists as a shim even without a JDK installed, + # so we must test with `java -version` directly. + if ! version_output=$(java -version 2>&1); then + echo "Java is not installed." + install_java + return + fi + + local java_version + java_version=$(echo "$version_output" | head -n1 | sed -E 's/.*"([0-9]+)(\.[0-9]+)*.*/\1/') + + if ! [[ "$java_version" =~ ^[0-9]+$ ]]; then + echo "Could not determine Java version." + install_java + return + fi + + if [ "$java_version" -ge 11 ]; then + echo "Java $java_version detected. OK." + else + echo "Java version $java_version found, but >= 11 is required." + install_java + fi +} + + +install_java() { + echo "Installing OpenJDK 11 in Breeze..." + + curl -L -o /files/openjdk-11-aarch64.tar.gz \ + https://github.com/adoptium/temurin11-binaries/releases/download/jdk-11.0.30+7/OpenJDK11U-jdk_aarch64_linux_hotspot_11.0.30_7.tar.gz Review Comment: The URL hardcodes `aarch64`, so on x86_64 (the default Breeze arch) this downloads an ARM binary and `tar` succeeds while `/files/openjdk/bin/java -version` fails with `Exec format error`. Detect arch with `uname -m` and template the URL. While here, please add `set -euo pipefail` at the top of the script so a failed curl / wrong-arch java doesn't slip past silently. ########## task-sdk/src/airflow/sdk/execution_time/schema/schema.json: ########## @@ -5393,8 +5393,8 @@ "title": "TIRunContext", "type": "object" }, - "TaskInstance": { - "description": "Schema for TaskInstance model with minimal required fields needed for Runtime.", + "TaskInstanceDTO": { Review Comment: This renames the wire shape (`TaskInstance` -> `TaskInstanceDTO`), drops `hostname`, and promotes `pool_slots` / `queue` / `priority_weight` to required, but `versions/__init__.py` still pins the bundle at `Version("2026-06-16")` (the head version that #67235 just shipped). Any external-runtime JAR built against `Airflow-Supervisor-Schema-Version: 2026-06-16` would now fail validation. Either add a new dated `Version(...)` with `VersionChange` instructions that re-add `hostname`, rename the model back, and make the three new fields optional, or call out in the PR body that 2026-06-16 has no shipped consumers yet and is safe to rewrite in place. ########## scripts/in_container/java_sdk_setup.sh: ########## @@ -0,0 +1,71 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + + +# 1. Check Java +check_java() { + local java_bin="/files/openjdk/bin/java" + local version_output + + # First check if the locally installed OpenJDK exists and works. + if [ -x "$java_bin" ] && version_output=$("$java_bin" -version 2>&1); then + echo "Found existing OpenJDK at $java_bin. OK." + return + fi + + # On macOS, /usr/bin/java exists as a shim even without a JDK installed, + # so we must test with `java -version` directly. + if ! version_output=$(java -version 2>&1); then + echo "Java is not installed." + install_java + return + fi + + local java_version + java_version=$(echo "$version_output" | head -n1 | sed -E 's/.*"([0-9]+)(\.[0-9]+)*.*/\1/') + + if ! [[ "$java_version" =~ ^[0-9]+$ ]]; then + echo "Could not determine Java version." + install_java + return + fi + + if [ "$java_version" -ge 11 ]; then + echo "Java $java_version detected. OK." + else + echo "Java version $java_version found, but >= 11 is required." + install_java + fi +} + + +install_java() { + echo "Installing OpenJDK 11 in Breeze..." + + curl -L -o /files/openjdk-11-aarch64.tar.gz \ + https://github.com/adoptium/temurin11-binaries/releases/download/jdk-11.0.30+7/OpenJDK11U-jdk_aarch64_linux_hotspot_11.0.30_7.tar.gz + + rm -rf /files/openjdk && mkdir -p /files/openjdk && \ + tar -xzf /files/openjdk-11-aarch64.tar.gz --strip-components=1 -C /files/openjdk Review Comment: No checksum verification on the JDK tarball. Adoptium ships a `.sha256.txt` alongside each artifact, please fetch it and `sha256sum -c` before extracting, so a corrupted download or a compromised mirror doesn't silently land an unverified JDK under `/files/openjdk`. ########## scripts/in_container/java_sdk_setup.sh: ########## @@ -0,0 +1,71 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + + +# 1. Check Java +check_java() { + local java_bin="/files/openjdk/bin/java" + local version_output + + # First check if the locally installed OpenJDK exists and works. + if [ -x "$java_bin" ] && version_output=$("$java_bin" -version 2>&1); then + echo "Found existing OpenJDK at $java_bin. OK." + return + fi + + # On macOS, /usr/bin/java exists as a shim even without a JDK installed, + # so we must test with `java -version` directly. + if ! version_output=$(java -version 2>&1); then + echo "Java is not installed." + install_java + return + fi + + local java_version + java_version=$(echo "$version_output" | head -n1 | sed -E 's/.*"([0-9]+)(\.[0-9]+)*.*/\1/') + + if ! [[ "$java_version" =~ ^[0-9]+$ ]]; then + echo "Could not determine Java version." + install_java + return + fi + + if [ "$java_version" -ge 11 ]; then + echo "Java $java_version detected. OK." + else + echo "Java version $java_version found, but >= 11 is required." + install_java + fi +} + + +install_java() { + echo "Installing OpenJDK 11 in Breeze..." + + curl -L -o /files/openjdk-11-aarch64.tar.gz \ + https://github.com/adoptium/temurin11-binaries/releases/download/jdk-11.0.30+7/OpenJDK11U-jdk_aarch64_linux_hotspot_11.0.30_7.tar.gz + + rm -rf /files/openjdk && mkdir -p /files/openjdk && \ + tar -xzf /files/openjdk-11-aarch64.tar.gz --strip-components=1 -C /files/openjdk + + /files/openjdk/bin/java -version + echo "" +} Review Comment: Missing `set -euo pipefail`. If curl fails, the script still runs `rm -rf /files/openjdk` before `tar` errors out, leaving the user with no JDK at all and an exit code of 0. Please add `set -euo pipefail` and download to a tempfile first, only swap `/files/openjdk` after curl (and ideally a checksum check) succeed. ########## task-sdk/src/airflow/sdk/coordinators/java/coordinator.py: ########## @@ -0,0 +1,369 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Java runtime coordinator that launches a JVM subprocess for Dag file processing and task execution.""" + +from __future__ import annotations + +import email +import itertools +import os +import pathlib +import selectors +import socket +import subprocess +import time +import zipfile +from typing import TYPE_CHECKING, TypeVar, cast + +import attrs +import structlog + +from airflow.sdk.execution_time.coordinator import BaseCoordinator +from airflow.sdk.execution_time.schema import get_schema_version_migrator +from airflow.sdk.execution_time.supervisor import ActivitySubprocess + +if TYPE_CHECKING: + from collections.abc import Sequence + + from structlog.typing import FilteringBoundLogger + from typing_extensions import Self + + from airflow.sdk.api.client import Client + from airflow.sdk.api.datamodels._generated import BundleInfo + from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO + + Tracked = TypeVar("Tracked", socket.socket, subprocess.Popen) + +log: FilteringBoundLogger = structlog.get_logger(logger_name="coordinators.java") + + +def _start_server() -> socket.socket: + server = socket.socket() + server.bind(("127.0.0.1", 0)) + server.setblocking(True) + server.listen(1) # Just need to listen to the child process. + return server + + +def _calculate_classpath(jars_root: Sequence[pathlib.Path]) -> str: + jars = (p.as_posix() for root in jars_root for p in root.iterdir() if p.suffix == ".jar") + return os.pathsep.join(jars) + + [email protected] +class _JarMetadata: + main_class: str + schema_version: str + + @classmethod + def from_jar(cls, path: pathlib.Path) -> Self | None: + try: + with zipfile.ZipFile(path) as zf: + try: + manifest_info = zf.getinfo("META-INF/MANIFEST.MF") + except KeyError: + log.debug("JAR does not contain META-INF/MANIFEST.MF; ignored", path=path) + return None + with zf.open(manifest_info) as f: + manifest = email.message_from_binary_file(f) + return cls(manifest["Main-Class"], manifest["Airflow-Supervisor-Schema-Version"]) + except zipfile.BadZipFile: + log.exception("Cannot read JAR; ignored", path=path) + return None + + +def _validate_schema_version(instance, _, value) -> str: + return get_schema_version_migrator().resolve_version(str(value)) + + [email protected] +class _JarInfo: + main_class: str + schema_version: str = attrs.field(validator=_validate_schema_version) + + @attrs.define + class _Progress: + main_class: str | None = attrs.field(init=False, default=None) + schema_version: str | None = attrs.field(init=False, default=None) + + def collect(self) -> _JarInfo | None: + if self.main_class is None or self.schema_version is None: + return None + return _JarInfo(self.main_class, self.schema_version) + + @classmethod + def find(cls, roots: Sequence[pathlib.Path], main_class: str) -> _JarInfo: + progress = cls._Progress() + for root in roots: + log.debug("Finding required JAR metadata in directory", dir=root) + for p in root.iterdir(): + if p.suffix != ".jar": + continue + if (metadata := _JarMetadata.from_jar(p)) is None: + continue + if metadata.main_class and ((main_class == metadata.main_class) or not main_class): + log.debug("JAR located with Main-Class metadata", path=p, main_class=metadata.main_class) + progress.main_class = metadata.main_class + if metadata.schema_version: + log.debug( + "JAR located with Airflow-Supervisor-Schema-Version metadata", + path=p, + schema_version=metadata.schema_version, + ) + progress.schema_version = metadata.schema_version + if (result := progress.collect()) is not None: + return result + if progress.main_class is not None: + tp = "cannot find a JAR with Airflow-Supervisor-Schema-Version metadata in {1}" + elif main_class: + tp = "cannot find a JAR with Main-Class matching {0!r} in {1}" + else: + tp = "cannot find a JAR with Main-Class metadata in {1}" + raise FileNotFoundError(tp.format(main_class, os.pathsep.join(os.fspath(p.resolve()) for p in roots))) + + +def _accept_connections( + servers: dict[str, socket.socket], + drains: dict[str, socket.socket], + proc: subprocess.Popen, + *, + max_wait: float = 10.0, + drain_size: int = 4096, +) -> tuple[dict[socket.socket, socket.socket], dict[socket.socket, bytes]]: + """Block until the Java process connects to servers.""" + accepted: dict[socket.socket, socket.socket] = {} + drained: dict[socket.socket, bytes] = {s: b"" for s in drains.values()} + with selectors.DefaultSelector() as sel: + for key, soc in itertools.chain(servers.items(), drains.items()): + sel.register(soc, selectors.EVENT_READ, data=key) + deadline = time.monotonic() + max_wait + while len(accepted) < len(servers): + remaining = deadline - time.monotonic() + if remaining <= 0: + for s in accepted.values(): + s.close() + raise TimeoutError("process did not connect within timeout") + if proc.poll() is not None: + for s in accepted.values(): + s.close() + raise RuntimeError(f"process exited with {proc.returncode} before connecting") + for event, _ in sel.select(timeout=min(remaining, 1.0)): + soc = cast("socket.socket", event.fileobj) + if soc in drained: + log.debug("Draining child process stream", key=event.data) + drained[soc] += soc.recv(drain_size) + else: + log.debug("Accepting child process connection", key=event.data) + conn, _ = soc.accept() + sel.unregister(soc) + accepted[soc] = conn + return accepted, drained + + [email protected] +class _ResourceTracker: + tracked: dict[int, socket.socket | subprocess.Popen] = attrs.field(init=False, factory=dict) + + def __enter__(self): + return self + + def __exit__(self, *exc_info): + for o in self.tracked.values(): + match o: + case socket.socket(): + o.close() + case subprocess.Popen(): + o.terminate() Review Comment: `__exit__` terminates the Popen but never waits, and the dict iteration order may close `comm_server` / `logs_server` before SIGTERM reaches the JVM. Any failure between Popen launch (231) and `tracker.untrack(... proc)` (282) leaves a zombie JVM child and likely a SIGPIPE'd shutdown. Please terminate first, `proc.wait(timeout=...)` with a `kill()` fallback, and only then close the sockets. (Related to the Popen-handle discussion in https://github.com/apache/airflow/pull/65958#discussion_r3296624213 -- this is about exit ordering, not the handle itself.) ########## task-sdk/src/airflow/sdk/execution_time/schema/schema.json: ########## @@ -5418,34 +5418,53 @@ "title": "Dag Version Id", "type": "string" }, - "hostname": { + "executor_config": { "anyOf": [ { - "type": "string" + "additionalProperties": true, + "type": "object" }, { Review Comment: `executor_config` is `Field(default=None, exclude=True)` on the model, so it never leaves the Python side at serialize time, but it's listed in `properties` here. The JSON schema and the wire shape now disagree, which matters because Java/TS coordinators will treat `schema.json` as the source of truth. Either regenerate with serialization-mode schema (so the field is dropped) or remove `exclude=True` and ship it intentionally. ########## task-sdk/src/airflow/sdk/execution_time/coordinator.py: ########## @@ -0,0 +1,244 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Runtime coordinator for non-Python DAG file processing and task execution. + +Provides :class:`BaseCoordinator`, the base class for +SDK-specific coordinators that bridge subprocess I/O between the +Airflow supervisor and an external-SDK runtime (Java, Go, Rust, etc.), +and :class:`CoordinatorManager`, the registry that loads coordinator +instances from the ``[sdk] coordinators`` configuration. + +The coordinator's :meth:`~BaseCoordinator.run_task_execution` handles the full +lifecycle: + +1. Creates TCP servers for comm and logs channels, and a socketpair for stderr. +2. Calls :meth:`~BaseCoordinator.task_execution_cmd` (provided by the subclass) + to obtain the subprocess command. +3. Spawns the subprocess and accepts TCP connections from it. +4. Runs a selector-based bridge that transparently forwards bytes + between fd 0 (supervisor) and the subprocess comm socket, and + re-emits the subprocess's log and stderr output through structlog. +""" + +from __future__ import annotations + +import contextlib +import functools +from typing import TYPE_CHECKING, Any + +import attrs +import pydantic +import structlog + +from airflow.sdk._shared.module_loading import import_string +from airflow.sdk.configuration import conf + +if TYPE_CHECKING: + from collections.abc import Mapping + from os import PathLike + + from structlog.typing import FilteringBoundLogger + from typing_extensions import Self + + from airflow.sdk.api.client import Client + from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO + +__all__ = [ + "BaseCoordinator", + "CoordinatorManager", + "get_coordinator_manager", + "reset_coordinator_manager", +] + +log = structlog.get_logger(__name__) + + +class BaseCoordinator: + """ + Base coordinator for runtime-specific DAG file processing and task execution. + + Coordinators are instantiated from the ``[sdk] coordinators`` configuration + (see :class:`CoordinatorManager`) — each entry's ``classpath`` is resolved + via :func:`~airflow.sdk._shared.module_loading.import_string` and + constructed with the entry's ``kwargs``. + """ + + @attrs.define(slots=True) + class ExecutionResult: + """Return value for :meth:`BaseCoordinator.execute_task`.""" + + exit_code: Any + final_state: str + + def execute_task( + self, + *, + what: TaskInstanceDTO, + dag_rel_path: str | PathLike[str], + bundle_info, + client: Client, + logger: FilteringBoundLogger | None = None, + sentry_integration: str = "", + subprocess_logs_to_stdout: bool, + **kwargs, + ) -> ExecutionResult: + """ + Start task execution. + + This should execute the task and return a result. + """ + raise NotImplementedError + + +class _CoordinatorSpec(pydantic.BaseModel): + classpath: str + kwargs: dict[str, Any] = pydantic.Field(default_factory=dict) + + +class _PythonCoordinator(BaseCoordinator): + """ + Coordinator implementation to execute Python tasks. + + This is not supposed to be specified by users directly, but the fallback + used by default when nothing is specified. + """ + + def execute_task( + self, + *, + what: TaskInstanceDTO, + dag_rel_path: str | PathLike[str], + bundle_info, + client: Client, + logger: FilteringBoundLogger | None = None, + sentry_integration: str = "", + subprocess_logs_to_stdout: bool, + **kwargs, + ) -> BaseCoordinator.ExecutionResult: + # TODO: Move this to somewhere that makes more sense. + from airflow.sdk.execution_time.supervisor import ActivitySubprocess + + process = ActivitySubprocess.start( + dag_rel_path=dag_rel_path, + what=what, + client=client, + logger=logger, + bundle_info=bundle_info, + subprocess_logs_to_stdout=subprocess_logs_to_stdout, + sentry_integration=sentry_integration, + ) + exit_code = process.wait() + return self.ExecutionResult(exit_code, process.final_state) + + [email protected] +def _build_python_coordinator() -> _PythonCoordinator: + return _PythonCoordinator() + + +class InvalidCoordinatorError(ValueError): + """Raised for an invalid coordinator configuration.""" + + [email protected](kw_only=True) +class CoordinatorManager: + """ + Registry of coordinator instances loaded from ``[sdk]`` configurations. + + The ``[sdk] coordinators`` value is a JSON object keyed by coordinator name:: + + { + "jdk-11": { + "classpath": "airflow.sdk.coordinators.java.JavaCoordinator", + "kwargs": {"java_executable": "/usr/lib/jvm/jdk-11/bin/java", ...}, + } + } + + The ``classpath`` is resolved via + :func:`~airflow.sdk._shared.module_loading.import_string` (no + :class:`ProvidersManager` involvement) and constructed with ``kwargs`` on + first use. A coordinator entry that is never looked up incurs no startup + cost. At most one coordinator object can be created from each entry. + + The ``[sdk] queue_to_coordinator`` config maps queue names to a key in the + object, which lets users reuse existing queue assignments to route tasks to + a specific coordinator instance (for example, a ``"legacy-java"`` queue + routed to a JDK 11 coordinator, and a ``"modern-java"`` queue routed to a + JDK 17 coordinator). + + :meta private: + """ + + _coordinator_specs: Mapping[str, _CoordinatorSpec] + _queue_to_coordinator: Mapping[str, str] + + _created_coordinators: dict[str, BaseCoordinator] = attrs.field(init=False, factory=dict) + + @classmethod + def from_config(cls) -> Self: + """Load coordinator specs from configuration without initialization.""" + coordinator_specs = { + k: _CoordinatorSpec.model_validate(v) + for k, v in conf.getjson("sdk", "coordinators", fallback={}).items() + } + queue_to_coordinator = conf.getjson("sdk", "queue_to_coordinator", fallback={}) + for key in queue_to_coordinator.values(): + if key not in coordinator_specs: + raise ValueError(f"[sdk] queue_to_coordinator references invalid coordinator key: {key!r}") + return cls(coordinator_specs=coordinator_specs, queue_to_coordinator=queue_to_coordinator) + + def _find_queue(self, key: str) -> BaseCoordinator: + with contextlib.suppress(KeyError): + return self._created_coordinators[key] + spec = self._coordinator_specs[key] + coordinator = self._created_coordinators[key] = import_string(spec.classpath)(**spec.kwargs) + return coordinator + + def for_queue(self, queue: str) -> BaseCoordinator: + """ + Find the coordinator for *queue*. + + If an entry is not registered, a Python coordinator is returned. + """ + try: + key = self._queue_to_coordinator[queue] + except KeyError: + log.debug("Queue not configured to a coordinator; defaulting to Python", queue=queue) + return _build_python_coordinator() + try: + coordinator = self._find_queue(key) + except KeyError: + raise InvalidCoordinatorError(f"Queue {queue!r} configured to nonexistent coordinator") + except ImportError: + raise InvalidCoordinatorError(f"Cannot import coordinator {key!r}") + except TypeError: + raise InvalidCoordinatorError(f"Cannot instantiate coordinator {key!r}") + log.debug("Coordinator found for queue", coordinator=coordinator, queue=queue) + return coordinator Review Comment: Two follow-ups remain on this block (noting the duck-typing argument in https://github.com/apache/airflow/pull/65958#discussion_r3296833580, I'm not re-raising the isinstance piece): 1. `raise InvalidCoordinatorError(...)` drops `from err`, so the underlying `ImportError`/`TypeError` traceback is gone. Use `raise InvalidCoordinatorError(...) from err` so users see the real cause (missing module, validator failure inside `__init__`, etc). 2. The message has the key but not `spec.classpath`, which is the field the operator actually mistyped. `f"Cannot import coordinator {key!r} (classpath={spec.classpath!r})"` would close the loop. ########## task-sdk/src/airflow/sdk/execution_time/coordinator.py: ########## @@ -0,0 +1,244 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Runtime coordinator for non-Python DAG file processing and task execution. + +Provides :class:`BaseCoordinator`, the base class for +SDK-specific coordinators that bridge subprocess I/O between the +Airflow supervisor and an external-SDK runtime (Java, Go, Rust, etc.), +and :class:`CoordinatorManager`, the registry that loads coordinator +instances from the ``[sdk] coordinators`` configuration. + +The coordinator's :meth:`~BaseCoordinator.run_task_execution` handles the full +lifecycle: + +1. Creates TCP servers for comm and logs channels, and a socketpair for stderr. +2. Calls :meth:`~BaseCoordinator.task_execution_cmd` (provided by the subclass) + to obtain the subprocess command. +3. Spawns the subprocess and accepts TCP connections from it. +4. Runs a selector-based bridge that transparently forwards bytes + between fd 0 (supervisor) and the subprocess comm socket, and + re-emits the subprocess's log and stderr output through structlog. +""" + +from __future__ import annotations + +import contextlib +import functools +from typing import TYPE_CHECKING, Any + +import attrs +import pydantic +import structlog + +from airflow.sdk._shared.module_loading import import_string +from airflow.sdk.configuration import conf + +if TYPE_CHECKING: + from collections.abc import Mapping + from os import PathLike + + from structlog.typing import FilteringBoundLogger + from typing_extensions import Self + + from airflow.sdk.api.client import Client + from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO + +__all__ = [ + "BaseCoordinator", + "CoordinatorManager", + "get_coordinator_manager", + "reset_coordinator_manager", +] + +log = structlog.get_logger(__name__) + + +class BaseCoordinator: + """ + Base coordinator for runtime-specific DAG file processing and task execution. + + Coordinators are instantiated from the ``[sdk] coordinators`` configuration + (see :class:`CoordinatorManager`) — each entry's ``classpath`` is resolved + via :func:`~airflow.sdk._shared.module_loading.import_string` and + constructed with the entry's ``kwargs``. + """ + + @attrs.define(slots=True) + class ExecutionResult: + """Return value for :meth:`BaseCoordinator.execute_task`.""" + + exit_code: Any + final_state: str + + def execute_task( + self, + *, + what: TaskInstanceDTO, + dag_rel_path: str | PathLike[str], + bundle_info, + client: Client, + logger: FilteringBoundLogger | None = None, + sentry_integration: str = "", + subprocess_logs_to_stdout: bool, + **kwargs, + ) -> ExecutionResult: + """ + Start task execution. + + This should execute the task and return a result. + """ + raise NotImplementedError + + +class _CoordinatorSpec(pydantic.BaseModel): + classpath: str + kwargs: dict[str, Any] = pydantic.Field(default_factory=dict) + + +class _PythonCoordinator(BaseCoordinator): + """ + Coordinator implementation to execute Python tasks. + + This is not supposed to be specified by users directly, but the fallback + used by default when nothing is specified. + """ + + def execute_task( + self, + *, + what: TaskInstanceDTO, + dag_rel_path: str | PathLike[str], + bundle_info, + client: Client, + logger: FilteringBoundLogger | None = None, + sentry_integration: str = "", + subprocess_logs_to_stdout: bool, + **kwargs, + ) -> BaseCoordinator.ExecutionResult: + # TODO: Move this to somewhere that makes more sense. + from airflow.sdk.execution_time.supervisor import ActivitySubprocess + + process = ActivitySubprocess.start( + dag_rel_path=dag_rel_path, + what=what, + client=client, + logger=logger, + bundle_info=bundle_info, + subprocess_logs_to_stdout=subprocess_logs_to_stdout, + sentry_integration=sentry_integration, + ) + exit_code = process.wait() + return self.ExecutionResult(exit_code, process.final_state) + + [email protected] +def _build_python_coordinator() -> _PythonCoordinator: + return _PythonCoordinator() + + +class InvalidCoordinatorError(ValueError): + """Raised for an invalid coordinator configuration.""" + + [email protected](kw_only=True) +class CoordinatorManager: + """ + Registry of coordinator instances loaded from ``[sdk]`` configurations. + + The ``[sdk] coordinators`` value is a JSON object keyed by coordinator name:: + + { + "jdk-11": { + "classpath": "airflow.sdk.coordinators.java.JavaCoordinator", + "kwargs": {"java_executable": "/usr/lib/jvm/jdk-11/bin/java", ...}, + } + } + + The ``classpath`` is resolved via + :func:`~airflow.sdk._shared.module_loading.import_string` (no + :class:`ProvidersManager` involvement) and constructed with ``kwargs`` on + first use. A coordinator entry that is never looked up incurs no startup + cost. At most one coordinator object can be created from each entry. + + The ``[sdk] queue_to_coordinator`` config maps queue names to a key in the + object, which lets users reuse existing queue assignments to route tasks to + a specific coordinator instance (for example, a ``"legacy-java"`` queue + routed to a JDK 11 coordinator, and a ``"modern-java"`` queue routed to a + JDK 17 coordinator). + + :meta private: + """ + + _coordinator_specs: Mapping[str, _CoordinatorSpec] + _queue_to_coordinator: Mapping[str, str] + + _created_coordinators: dict[str, BaseCoordinator] = attrs.field(init=False, factory=dict) + + @classmethod + def from_config(cls) -> Self: + """Load coordinator specs from configuration without initialization.""" + coordinator_specs = { + k: _CoordinatorSpec.model_validate(v) + for k, v in conf.getjson("sdk", "coordinators", fallback={}).items() + } + queue_to_coordinator = conf.getjson("sdk", "queue_to_coordinator", fallback={}) + for key in queue_to_coordinator.values(): + if key not in coordinator_specs: + raise ValueError(f"[sdk] queue_to_coordinator references invalid coordinator key: {key!r}") + return cls(coordinator_specs=coordinator_specs, queue_to_coordinator=queue_to_coordinator) + + def _find_queue(self, key: str) -> BaseCoordinator: + with contextlib.suppress(KeyError): + return self._created_coordinators[key] + spec = self._coordinator_specs[key] + coordinator = self._created_coordinators[key] = import_string(spec.classpath)(**spec.kwargs) + return coordinator Review Comment: `_find_queue` reads then writes `_created_coordinators[key]` outside any lock. The class docstring says "at most one coordinator object can be created from each entry", but two concurrent first-use calls will both construct and one instance gets orphaned -- with no `stop()` method on `BaseCoordinator`, that instance's sockets/subprocesses leak. Either eager-construct in `from_config` (the lazy goal is marginal, it's one import at startup) or add a `threading.Lock` around the construct-and-cache. ########## task-sdk/src/airflow/sdk/execution_time/coordinator.py: ########## @@ -0,0 +1,244 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Runtime coordinator for non-Python DAG file processing and task execution. + +Provides :class:`BaseCoordinator`, the base class for +SDK-specific coordinators that bridge subprocess I/O between the +Airflow supervisor and an external-SDK runtime (Java, Go, Rust, etc.), +and :class:`CoordinatorManager`, the registry that loads coordinator +instances from the ``[sdk] coordinators`` configuration. + +The coordinator's :meth:`~BaseCoordinator.run_task_execution` handles the full +lifecycle: + +1. Creates TCP servers for comm and logs channels, and a socketpair for stderr. +2. Calls :meth:`~BaseCoordinator.task_execution_cmd` (provided by the subclass) + to obtain the subprocess command. +3. Spawns the subprocess and accepts TCP connections from it. +4. Runs a selector-based bridge that transparently forwards bytes + between fd 0 (supervisor) and the subprocess comm socket, and + re-emits the subprocess's log and stderr output through structlog. +""" + +from __future__ import annotations + +import contextlib +import functools +from typing import TYPE_CHECKING, Any + +import attrs +import pydantic +import structlog + +from airflow.sdk._shared.module_loading import import_string +from airflow.sdk.configuration import conf + +if TYPE_CHECKING: + from collections.abc import Mapping + from os import PathLike + + from structlog.typing import FilteringBoundLogger + from typing_extensions import Self + + from airflow.sdk.api.client import Client + from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO + +__all__ = [ + "BaseCoordinator", + "CoordinatorManager", + "get_coordinator_manager", + "reset_coordinator_manager", +] + +log = structlog.get_logger(__name__) + + +class BaseCoordinator: + """ + Base coordinator for runtime-specific DAG file processing and task execution. + + Coordinators are instantiated from the ``[sdk] coordinators`` configuration + (see :class:`CoordinatorManager`) — each entry's ``classpath`` is resolved + via :func:`~airflow.sdk._shared.module_loading.import_string` and + constructed with the entry's ``kwargs``. + """ + + @attrs.define(slots=True) + class ExecutionResult: + """Return value for :meth:`BaseCoordinator.execute_task`.""" + + exit_code: Any + final_state: str + + def execute_task( + self, + *, + what: TaskInstanceDTO, + dag_rel_path: str | PathLike[str], + bundle_info, + client: Client, + logger: FilteringBoundLogger | None = None, + sentry_integration: str = "", + subprocess_logs_to_stdout: bool, + **kwargs, + ) -> ExecutionResult: + """ + Start task execution. + + This should execute the task and return a result. + """ + raise NotImplementedError Review Comment: `BaseCoordinator` is missing a `close` / `stop` / `__exit__` method -- the Java coordinator in this PR opens listening sockets and spawns a JVM but the base class doesn't require teardown. Without a contract here, every future coordinator (TS, Go) re-invents (and risks forgetting) atexit handlers. In the spirit of the planned base-class refactoring you mentioned in https://github.com/apache/airflow/pull/65958#discussion_r3294145816, suggest adding a no-op `def close(self) -> None: ...` to the base and either teaching `CoordinatorManager` to call it on exit, or shipping a `closing(...)` helper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
