kaxil commented on code in PR #65958:
URL: https://github.com/apache/airflow/pull/65958#discussion_r3307753961


##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -2370,27 +2425,26 @@ def supervise_task(
         reset_secrets_masker()
 
         try:
-            process = ActivitySubprocess.start(
-                dag_rel_path=dag_rel_path,
+            coordinator = get_coordinator_manager().for_queue(ti.queue)
+            result = coordinator.execute_task(
                 what=ti,
+                dag_rel_path=dag_rel_path,
+                bundle_info=bundle_info,
                 client=client,
                 logger=logger,
-                bundle_info=bundle_info,
-                subprocess_logs_to_stdout=subprocess_logs_to_stdout,
                 sentry_integration=sentry_integration,
+                subprocess_logs_to_stdout=subprocess_logs_to_stdout,
             )
-
-            exit_code = process.wait()
             end = time.monotonic()
             log.info(
                 "Workload finished",
                 workload_type="ExecuteTask",
                 workload_id=str(ti.id),
-                exit_code=exit_code,
+                exit_code=result.exit_code,
                 duration=end - start,
-                final_state=process.final_state,
+                final_state=result.final_state,
             )
-            return exit_code
+            return result.exit_code

Review Comment:
   `for_queue(ti.queue)` can raise `InvalidCoordinatorError` (bad classpath, 
missing coordinator key, kwargs mismatch) and it escapes `supervise_task` 
uncaught. TI was already transitioned to `RUNNING` by the supervisor preamble, 
so a misconfigured queue strands the task in `RUNNING` until zombie cleanup. 
Catch it here, log with `dag_id` / `run_id` / `task_id` / `queue` bound, and 
report a clean failure to the API.



##########
task-sdk/src/airflow/sdk/coordinators/java/coordinator.py:
##########
@@ -0,0 +1,369 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Java runtime coordinator that launches a JVM subprocess for Dag file 
processing and task execution."""
+
+from __future__ import annotations
+
+import email
+import itertools
+import os
+import pathlib
+import selectors
+import socket
+import subprocess
+import time
+import zipfile
+from typing import TYPE_CHECKING, TypeVar, cast
+
+import attrs
+import structlog
+
+from airflow.sdk.execution_time.coordinator import BaseCoordinator
+from airflow.sdk.execution_time.schema import get_schema_version_migrator
+from airflow.sdk.execution_time.supervisor import ActivitySubprocess
+
+if TYPE_CHECKING:
+    from collections.abc import Sequence
+
+    from structlog.typing import FilteringBoundLogger
+    from typing_extensions import Self
+
+    from airflow.sdk.api.client import Client
+    from airflow.sdk.api.datamodels._generated import BundleInfo
+    from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
+
+    Tracked = TypeVar("Tracked", socket.socket, subprocess.Popen)
+
+log: FilteringBoundLogger = 
structlog.get_logger(logger_name="coordinators.java")
+
+
+def _start_server() -> socket.socket:
+    server = socket.socket()
+    server.bind(("127.0.0.1", 0))
+    server.setblocking(True)
+    server.listen(1)  # Just need to listen to the child process.
+    return server
+
+
+def _calculate_classpath(jars_root: Sequence[pathlib.Path]) -> str:
+    jars = (p.as_posix() for root in jars_root for p in root.iterdir() if 
p.suffix == ".jar")
+    return os.pathsep.join(jars)
+
+
[email protected]
+class _JarMetadata:
+    main_class: str
+    schema_version: str
+
+    @classmethod
+    def from_jar(cls, path: pathlib.Path) -> Self | None:
+        try:
+            with zipfile.ZipFile(path) as zf:
+                try:
+                    manifest_info = zf.getinfo("META-INF/MANIFEST.MF")
+                except KeyError:
+                    log.debug("JAR does not contain META-INF/MANIFEST.MF; 
ignored", path=path)
+                    return None
+                with zf.open(manifest_info) as f:
+                    manifest = email.message_from_binary_file(f)
+            return cls(manifest["Main-Class"], 
manifest["Airflow-Supervisor-Schema-Version"])
+        except zipfile.BadZipFile:
+            log.exception("Cannot read JAR; ignored", path=path)
+            return None
+
+
+def _validate_schema_version(instance, _, value) -> str:
+    return get_schema_version_migrator().resolve_version(str(value))
+
+
[email protected]
+class _JarInfo:
+    main_class: str
+    schema_version: str = attrs.field(validator=_validate_schema_version)
+
+    @attrs.define
+    class _Progress:
+        main_class: str | None = attrs.field(init=False, default=None)
+        schema_version: str | None = attrs.field(init=False, default=None)
+
+        def collect(self) -> _JarInfo | None:
+            if self.main_class is None or self.schema_version is None:
+                return None
+            return _JarInfo(self.main_class, self.schema_version)
+
+    @classmethod
+    def find(cls, roots: Sequence[pathlib.Path], main_class: str) -> _JarInfo:
+        progress = cls._Progress()
+        for root in roots:
+            log.debug("Finding required JAR metadata in directory", dir=root)
+            for p in root.iterdir():
+                if p.suffix != ".jar":
+                    continue
+                if (metadata := _JarMetadata.from_jar(p)) is None:
+                    continue
+                if metadata.main_class and ((main_class == 
metadata.main_class) or not main_class):
+                    log.debug("JAR located with Main-Class metadata", path=p, 
main_class=metadata.main_class)
+                    progress.main_class = metadata.main_class
+                if metadata.schema_version:
+                    log.debug(
+                        "JAR located with Airflow-Supervisor-Schema-Version 
metadata",
+                        path=p,
+                        schema_version=metadata.schema_version,
+                    )
+                    progress.schema_version = metadata.schema_version
+                if (result := progress.collect()) is not None:
+                    return result
+        if progress.main_class is not None:
+            tp = "cannot find a JAR with Airflow-Supervisor-Schema-Version 
metadata in {1}"
+        elif main_class:
+            tp = "cannot find a JAR with Main-Class matching {0!r} in {1}"
+        else:
+            tp = "cannot find a JAR with Main-Class metadata in {1}"
+        raise FileNotFoundError(tp.format(main_class, 
os.pathsep.join(os.fspath(p.resolve()) for p in roots)))
+
+
+def _accept_connections(
+    servers: dict[str, socket.socket],
+    drains: dict[str, socket.socket],
+    proc: subprocess.Popen,
+    *,
+    max_wait: float = 10.0,
+    drain_size: int = 4096,
+) -> tuple[dict[socket.socket, socket.socket], dict[socket.socket, bytes]]:
+    """Block until the Java process connects to servers."""
+    accepted: dict[socket.socket, socket.socket] = {}
+    drained: dict[socket.socket, bytes] = {s: b"" for s in drains.values()}
+    with selectors.DefaultSelector() as sel:
+        for key, soc in itertools.chain(servers.items(), drains.items()):
+            sel.register(soc, selectors.EVENT_READ, data=key)
+        deadline = time.monotonic() + max_wait
+        while len(accepted) < len(servers):
+            remaining = deadline - time.monotonic()
+            if remaining <= 0:
+                for s in accepted.values():
+                    s.close()
+                raise TimeoutError("process did not connect within timeout")
+            if proc.poll() is not None:
+                for s in accepted.values():
+                    s.close()
+                raise RuntimeError(f"process exited with {proc.returncode} 
before connecting")
+            for event, _ in sel.select(timeout=min(remaining, 1.0)):
+                soc = cast("socket.socket", event.fileobj)
+                if soc in drained:
+                    log.debug("Draining child process stream", key=event.data)
+                    drained[soc] += soc.recv(drain_size)
+                else:
+                    log.debug("Accepting child process connection", 
key=event.data)
+                    conn, _ = soc.accept()
+                    sel.unregister(soc)
+                    accepted[soc] = conn

Review Comment:
   When `recv` returns `b""` (peer closed), please `sel.unregister(soc)` -- 
otherwise a dead JVM that closes its stdout before connecting causes selectors 
to fire repeatedly on the half-closed socketpair, burning CPU for up to 
`max_wait` seconds until `proc.poll()` catches up.



##########
task-sdk/src/airflow/sdk/coordinators/java/coordinator.py:
##########
@@ -0,0 +1,369 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Java runtime coordinator that launches a JVM subprocess for Dag file 
processing and task execution."""
+
+from __future__ import annotations
+
+import email
+import itertools
+import os
+import pathlib
+import selectors
+import socket
+import subprocess
+import time
+import zipfile
+from typing import TYPE_CHECKING, TypeVar, cast
+
+import attrs
+import structlog
+
+from airflow.sdk.execution_time.coordinator import BaseCoordinator
+from airflow.sdk.execution_time.schema import get_schema_version_migrator
+from airflow.sdk.execution_time.supervisor import ActivitySubprocess
+
+if TYPE_CHECKING:
+    from collections.abc import Sequence
+
+    from structlog.typing import FilteringBoundLogger
+    from typing_extensions import Self
+
+    from airflow.sdk.api.client import Client
+    from airflow.sdk.api.datamodels._generated import BundleInfo
+    from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
+
+    Tracked = TypeVar("Tracked", socket.socket, subprocess.Popen)
+
+log: FilteringBoundLogger = 
structlog.get_logger(logger_name="coordinators.java")
+
+
+def _start_server() -> socket.socket:
+    server = socket.socket()
+    server.bind(("127.0.0.1", 0))
+    server.setblocking(True)
+    server.listen(1)  # Just need to listen to the child process.
+    return server
+
+
+def _calculate_classpath(jars_root: Sequence[pathlib.Path]) -> str:
+    jars = (p.as_posix() for root in jars_root for p in root.iterdir() if 
p.suffix == ".jar")
+    return os.pathsep.join(jars)

Review Comment:
   Two small things: please `sorted(root.iterdir())` so the classpath order is 
deterministic across workers (otherwise duplicate classes in two JARs resolve 
nondeterministically), and validate `root.is_dir()` up front -- right now a 
misconfigured `jars_root` surfaces as a bare `pathlib` `FileNotFoundError` with 
no indication which configured root was bad.



##########
task-sdk/src/airflow/sdk/coordinators/java/coordinator.py:
##########
@@ -0,0 +1,369 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Java runtime coordinator that launches a JVM subprocess for Dag file 
processing and task execution."""
+
+from __future__ import annotations
+
+import email
+import itertools
+import os
+import pathlib
+import selectors
+import socket
+import subprocess
+import time
+import zipfile
+from typing import TYPE_CHECKING, TypeVar, cast
+
+import attrs
+import structlog
+
+from airflow.sdk.execution_time.coordinator import BaseCoordinator
+from airflow.sdk.execution_time.schema import get_schema_version_migrator
+from airflow.sdk.execution_time.supervisor import ActivitySubprocess
+
+if TYPE_CHECKING:
+    from collections.abc import Sequence
+
+    from structlog.typing import FilteringBoundLogger
+    from typing_extensions import Self
+
+    from airflow.sdk.api.client import Client
+    from airflow.sdk.api.datamodels._generated import BundleInfo
+    from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
+
+    Tracked = TypeVar("Tracked", socket.socket, subprocess.Popen)
+
+log: FilteringBoundLogger = 
structlog.get_logger(logger_name="coordinators.java")
+
+
+def _start_server() -> socket.socket:
+    server = socket.socket()
+    server.bind(("127.0.0.1", 0))
+    server.setblocking(True)
+    server.listen(1)  # Just need to listen to the child process.
+    return server
+
+
+def _calculate_classpath(jars_root: Sequence[pathlib.Path]) -> str:
+    jars = (p.as_posix() for root in jars_root for p in root.iterdir() if 
p.suffix == ".jar")
+    return os.pathsep.join(jars)
+
+
[email protected]
+class _JarMetadata:
+    main_class: str
+    schema_version: str
+
+    @classmethod
+    def from_jar(cls, path: pathlib.Path) -> Self | None:
+        try:
+            with zipfile.ZipFile(path) as zf:
+                try:
+                    manifest_info = zf.getinfo("META-INF/MANIFEST.MF")
+                except KeyError:
+                    log.debug("JAR does not contain META-INF/MANIFEST.MF; 
ignored", path=path)
+                    return None
+                with zf.open(manifest_info) as f:
+                    manifest = email.message_from_binary_file(f)
+            return cls(manifest["Main-Class"], 
manifest["Airflow-Supervisor-Schema-Version"])
+        except zipfile.BadZipFile:
+            log.exception("Cannot read JAR; ignored", path=path)
+            return None
+
+
+def _validate_schema_version(instance, _, value) -> str:
+    return get_schema_version_migrator().resolve_version(str(value))
+
+
[email protected]
+class _JarInfo:
+    main_class: str
+    schema_version: str = attrs.field(validator=_validate_schema_version)
+
+    @attrs.define
+    class _Progress:
+        main_class: str | None = attrs.field(init=False, default=None)
+        schema_version: str | None = attrs.field(init=False, default=None)
+
+        def collect(self) -> _JarInfo | None:
+            if self.main_class is None or self.schema_version is None:
+                return None
+            return _JarInfo(self.main_class, self.schema_version)
+
+    @classmethod
+    def find(cls, roots: Sequence[pathlib.Path], main_class: str) -> _JarInfo:
+        progress = cls._Progress()
+        for root in roots:
+            log.debug("Finding required JAR metadata in directory", dir=root)
+            for p in root.iterdir():
+                if p.suffix != ".jar":
+                    continue
+                if (metadata := _JarMetadata.from_jar(p)) is None:
+                    continue
+                if metadata.main_class and ((main_class == 
metadata.main_class) or not main_class):
+                    log.debug("JAR located with Main-Class metadata", path=p, 
main_class=metadata.main_class)
+                    progress.main_class = metadata.main_class
+                if metadata.schema_version:
+                    log.debug(
+                        "JAR located with Airflow-Supervisor-Schema-Version 
metadata",
+                        path=p,
+                        schema_version=metadata.schema_version,
+                    )
+                    progress.schema_version = metadata.schema_version
+                if (result := progress.collect()) is not None:
+                    return result
+        if progress.main_class is not None:
+            tp = "cannot find a JAR with Airflow-Supervisor-Schema-Version 
metadata in {1}"
+        elif main_class:
+            tp = "cannot find a JAR with Main-Class matching {0!r} in {1}"
+        else:
+            tp = "cannot find a JAR with Main-Class metadata in {1}"
+        raise FileNotFoundError(tp.format(main_class, 
os.pathsep.join(os.fspath(p.resolve()) for p in roots)))

Review Comment:
   When this `FileNotFoundError` fires the user gets a path list joined by 
`os.pathsep` (looks like a classpath, not a list of dirs) and no record of 
which JARs were scanned or what their headers contained -- those facts are only 
at debug level. Please aggregate the per-JAR (`path`, `main_class`, 
`schema_version`) tuples and include a compact summary in the error message, 
and join the dir list with `, `. Adding `from None` would also keep the noise 
down.



##########
task-sdk/src/airflow/sdk/coordinators/java/coordinator.py:
##########
@@ -0,0 +1,369 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Java runtime coordinator that launches a JVM subprocess for Dag file 
processing and task execution."""
+
+from __future__ import annotations
+
+import email
+import itertools
+import os
+import pathlib
+import selectors
+import socket
+import subprocess
+import time
+import zipfile
+from typing import TYPE_CHECKING, TypeVar, cast
+
+import attrs
+import structlog
+
+from airflow.sdk.execution_time.coordinator import BaseCoordinator
+from airflow.sdk.execution_time.schema import get_schema_version_migrator
+from airflow.sdk.execution_time.supervisor import ActivitySubprocess
+
+if TYPE_CHECKING:
+    from collections.abc import Sequence
+
+    from structlog.typing import FilteringBoundLogger
+    from typing_extensions import Self
+
+    from airflow.sdk.api.client import Client
+    from airflow.sdk.api.datamodels._generated import BundleInfo
+    from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
+
+    Tracked = TypeVar("Tracked", socket.socket, subprocess.Popen)
+
+log: FilteringBoundLogger = 
structlog.get_logger(logger_name="coordinators.java")
+
+
+def _start_server() -> socket.socket:
+    server = socket.socket()
+    server.bind(("127.0.0.1", 0))
+    server.setblocking(True)
+    server.listen(1)  # Just need to listen to the child process.
+    return server
+
+
+def _calculate_classpath(jars_root: Sequence[pathlib.Path]) -> str:
+    jars = (p.as_posix() for root in jars_root for p in root.iterdir() if 
p.suffix == ".jar")
+    return os.pathsep.join(jars)
+
+
[email protected]
+class _JarMetadata:
+    main_class: str
+    schema_version: str
+
+    @classmethod
+    def from_jar(cls, path: pathlib.Path) -> Self | None:
+        try:
+            with zipfile.ZipFile(path) as zf:
+                try:
+                    manifest_info = zf.getinfo("META-INF/MANIFEST.MF")
+                except KeyError:
+                    log.debug("JAR does not contain META-INF/MANIFEST.MF; 
ignored", path=path)
+                    return None
+                with zf.open(manifest_info) as f:
+                    manifest = email.message_from_binary_file(f)
+            return cls(manifest["Main-Class"], 
manifest["Airflow-Supervisor-Schema-Version"])
+        except zipfile.BadZipFile:
+            log.exception("Cannot read JAR; ignored", path=path)
+            return None
+
+
+def _validate_schema_version(instance, _, value) -> str:
+    return get_schema_version_migrator().resolve_version(str(value))
+
+
[email protected]
+class _JarInfo:
+    main_class: str
+    schema_version: str = attrs.field(validator=_validate_schema_version)
+
+    @attrs.define
+    class _Progress:
+        main_class: str | None = attrs.field(init=False, default=None)
+        schema_version: str | None = attrs.field(init=False, default=None)
+
+        def collect(self) -> _JarInfo | None:
+            if self.main_class is None or self.schema_version is None:
+                return None
+            return _JarInfo(self.main_class, self.schema_version)
+
+    @classmethod
+    def find(cls, roots: Sequence[pathlib.Path], main_class: str) -> _JarInfo:
+        progress = cls._Progress()
+        for root in roots:
+            log.debug("Finding required JAR metadata in directory", dir=root)
+            for p in root.iterdir():
+                if p.suffix != ".jar":
+                    continue
+                if (metadata := _JarMetadata.from_jar(p)) is None:
+                    continue
+                if metadata.main_class and ((main_class == 
metadata.main_class) or not main_class):
+                    log.debug("JAR located with Main-Class metadata", path=p, 
main_class=metadata.main_class)
+                    progress.main_class = metadata.main_class
+                if metadata.schema_version:
+                    log.debug(
+                        "JAR located with Airflow-Supervisor-Schema-Version 
metadata",
+                        path=p,
+                        schema_version=metadata.schema_version,
+                    )
+                    progress.schema_version = metadata.schema_version
+                if (result := progress.collect()) is not None:
+                    return result
+        if progress.main_class is not None:
+            tp = "cannot find a JAR with Airflow-Supervisor-Schema-Version 
metadata in {1}"
+        elif main_class:
+            tp = "cannot find a JAR with Main-Class matching {0!r} in {1}"
+        else:
+            tp = "cannot find a JAR with Main-Class metadata in {1}"
+        raise FileNotFoundError(tp.format(main_class, 
os.pathsep.join(os.fspath(p.resolve()) for p in roots)))
+
+
+def _accept_connections(
+    servers: dict[str, socket.socket],
+    drains: dict[str, socket.socket],
+    proc: subprocess.Popen,
+    *,
+    max_wait: float = 10.0,

Review Comment:
   The 10s JVM-connect timeout is hardcoded and not exposed on 
`JavaCoordinator`. Slow cold-start JVMs and fat classpaths on 
resource-constrained workers will hit `TimeoutError` with no way to bump it 
short of monkey-patching. Please surface it as a coordinator field (e.g. 
`connect_timeout: float = 10.0`) and pass it through.



##########
task-sdk/src/airflow/sdk/coordinators/java/coordinator.py:
##########
@@ -0,0 +1,369 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Java runtime coordinator that launches a JVM subprocess for Dag file 
processing and task execution."""
+
+from __future__ import annotations
+
+import email
+import itertools
+import os
+import pathlib
+import selectors
+import socket
+import subprocess
+import time
+import zipfile
+from typing import TYPE_CHECKING, TypeVar, cast
+
+import attrs
+import structlog
+
+from airflow.sdk.execution_time.coordinator import BaseCoordinator
+from airflow.sdk.execution_time.schema import get_schema_version_migrator
+from airflow.sdk.execution_time.supervisor import ActivitySubprocess
+
+if TYPE_CHECKING:
+    from collections.abc import Sequence
+
+    from structlog.typing import FilteringBoundLogger
+    from typing_extensions import Self
+
+    from airflow.sdk.api.client import Client
+    from airflow.sdk.api.datamodels._generated import BundleInfo
+    from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
+
+    Tracked = TypeVar("Tracked", socket.socket, subprocess.Popen)
+
+log: FilteringBoundLogger = 
structlog.get_logger(logger_name="coordinators.java")
+
+
+def _start_server() -> socket.socket:
+    server = socket.socket()
+    server.bind(("127.0.0.1", 0))
+    server.setblocking(True)
+    server.listen(1)  # Just need to listen to the child process.
+    return server
+
+
+def _calculate_classpath(jars_root: Sequence[pathlib.Path]) -> str:
+    jars = (p.as_posix() for root in jars_root for p in root.iterdir() if 
p.suffix == ".jar")
+    return os.pathsep.join(jars)
+
+
[email protected]
+class _JarMetadata:
+    main_class: str
+    schema_version: str
+
+    @classmethod
+    def from_jar(cls, path: pathlib.Path) -> Self | None:
+        try:
+            with zipfile.ZipFile(path) as zf:
+                try:
+                    manifest_info = zf.getinfo("META-INF/MANIFEST.MF")
+                except KeyError:
+                    log.debug("JAR does not contain META-INF/MANIFEST.MF; 
ignored", path=path)
+                    return None
+                with zf.open(manifest_info) as f:
+                    manifest = email.message_from_binary_file(f)
+            return cls(manifest["Main-Class"], 
manifest["Airflow-Supervisor-Schema-Version"])
+        except zipfile.BadZipFile:
+            log.exception("Cannot read JAR; ignored", path=path)
+            return None
+
+
+def _validate_schema_version(instance, _, value) -> str:
+    return get_schema_version_migrator().resolve_version(str(value))
+
+
[email protected]
+class _JarInfo:
+    main_class: str
+    schema_version: str = attrs.field(validator=_validate_schema_version)
+
+    @attrs.define
+    class _Progress:
+        main_class: str | None = attrs.field(init=False, default=None)
+        schema_version: str | None = attrs.field(init=False, default=None)
+
+        def collect(self) -> _JarInfo | None:
+            if self.main_class is None or self.schema_version is None:
+                return None
+            return _JarInfo(self.main_class, self.schema_version)
+
+    @classmethod
+    def find(cls, roots: Sequence[pathlib.Path], main_class: str) -> _JarInfo:
+        progress = cls._Progress()
+        for root in roots:
+            log.debug("Finding required JAR metadata in directory", dir=root)
+            for p in root.iterdir():
+                if p.suffix != ".jar":
+                    continue
+                if (metadata := _JarMetadata.from_jar(p)) is None:
+                    continue
+                if metadata.main_class and ((main_class == 
metadata.main_class) or not main_class):
+                    log.debug("JAR located with Main-Class metadata", path=p, 
main_class=metadata.main_class)
+                    progress.main_class = metadata.main_class
+                if metadata.schema_version:
+                    log.debug(
+                        "JAR located with Airflow-Supervisor-Schema-Version 
metadata",
+                        path=p,
+                        schema_version=metadata.schema_version,
+                    )
+                    progress.schema_version = metadata.schema_version
+                if (result := progress.collect()) is not None:
+                    return result
+        if progress.main_class is not None:
+            tp = "cannot find a JAR with Airflow-Supervisor-Schema-Version 
metadata in {1}"
+        elif main_class:
+            tp = "cannot find a JAR with Main-Class matching {0!r} in {1}"
+        else:
+            tp = "cannot find a JAR with Main-Class metadata in {1}"
+        raise FileNotFoundError(tp.format(main_class, 
os.pathsep.join(os.fspath(p.resolve()) for p in roots)))
+
+
+def _accept_connections(
+    servers: dict[str, socket.socket],
+    drains: dict[str, socket.socket],
+    proc: subprocess.Popen,
+    *,
+    max_wait: float = 10.0,
+    drain_size: int = 4096,
+) -> tuple[dict[socket.socket, socket.socket], dict[socket.socket, bytes]]:
+    """Block until the Java process connects to servers."""
+    accepted: dict[socket.socket, socket.socket] = {}
+    drained: dict[socket.socket, bytes] = {s: b"" for s in drains.values()}
+    with selectors.DefaultSelector() as sel:
+        for key, soc in itertools.chain(servers.items(), drains.items()):
+            sel.register(soc, selectors.EVENT_READ, data=key)
+        deadline = time.monotonic() + max_wait
+        while len(accepted) < len(servers):
+            remaining = deadline - time.monotonic()
+            if remaining <= 0:
+                for s in accepted.values():
+                    s.close()
+                raise TimeoutError("process did not connect within timeout")
+            if proc.poll() is not None:
+                for s in accepted.values():
+                    s.close()
+                raise RuntimeError(f"process exited with {proc.returncode} 
before connecting")
+            for event, _ in sel.select(timeout=min(remaining, 1.0)):
+                soc = cast("socket.socket", event.fileobj)
+                if soc in drained:
+                    log.debug("Draining child process stream", key=event.data)
+                    drained[soc] += soc.recv(drain_size)
+                else:
+                    log.debug("Accepting child process connection", 
key=event.data)
+                    conn, _ = soc.accept()
+                    sel.unregister(soc)
+                    accepted[soc] = conn
+    return accepted, drained
+
+
[email protected]
+class _ResourceTracker:
+    tracked: dict[int, socket.socket | subprocess.Popen] = 
attrs.field(init=False, factory=dict)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *exc_info):
+        for o in self.tracked.values():
+            match o:
+                case socket.socket():
+                    o.close()
+                case subprocess.Popen():
+                    o.terminate()
+
+    def track(self, *objects: Tracked) -> tuple[Tracked, ...]:
+        self.tracked.update((hash(o), o) for o in objects)
+        return objects
+
+    def untrack(self, *objects: Tracked) -> tuple[Tracked, ...]:
+        for o in objects:
+            self.tracked.pop(hash(o), None)
+        return objects

Review Comment:
   `hash(o)` works today only because `socket.socket` and `subprocess.Popen` 
fall back to id-based hashing. Use `id(o)` directly -- it's what you actually 
want here and immune to any future `__eq__` / `__hash__` overrides on either 
class.



##########
task-sdk/src/airflow/sdk/coordinators/java/coordinator.py:
##########
@@ -0,0 +1,369 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Java runtime coordinator that launches a JVM subprocess for Dag file 
processing and task execution."""
+
+from __future__ import annotations
+
+import email
+import itertools
+import os
+import pathlib
+import selectors
+import socket
+import subprocess
+import time
+import zipfile
+from typing import TYPE_CHECKING, TypeVar, cast
+
+import attrs
+import structlog
+
+from airflow.sdk.execution_time.coordinator import BaseCoordinator
+from airflow.sdk.execution_time.schema import get_schema_version_migrator
+from airflow.sdk.execution_time.supervisor import ActivitySubprocess
+
+if TYPE_CHECKING:
+    from collections.abc import Sequence
+
+    from structlog.typing import FilteringBoundLogger
+    from typing_extensions import Self
+
+    from airflow.sdk.api.client import Client
+    from airflow.sdk.api.datamodels._generated import BundleInfo
+    from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
+
+    Tracked = TypeVar("Tracked", socket.socket, subprocess.Popen)
+
+log: FilteringBoundLogger = 
structlog.get_logger(logger_name="coordinators.java")
+
+
+def _start_server() -> socket.socket:
+    server = socket.socket()
+    server.bind(("127.0.0.1", 0))
+    server.setblocking(True)
+    server.listen(1)  # Just need to listen to the child process.
+    return server
+
+
+def _calculate_classpath(jars_root: Sequence[pathlib.Path]) -> str:
+    jars = (p.as_posix() for root in jars_root for p in root.iterdir() if 
p.suffix == ".jar")
+    return os.pathsep.join(jars)
+
+
[email protected]
+class _JarMetadata:
+    main_class: str
+    schema_version: str
+
+    @classmethod
+    def from_jar(cls, path: pathlib.Path) -> Self | None:
+        try:
+            with zipfile.ZipFile(path) as zf:
+                try:
+                    manifest_info = zf.getinfo("META-INF/MANIFEST.MF")
+                except KeyError:
+                    log.debug("JAR does not contain META-INF/MANIFEST.MF; 
ignored", path=path)
+                    return None
+                with zf.open(manifest_info) as f:
+                    manifest = email.message_from_binary_file(f)
+            return cls(manifest["Main-Class"], 
manifest["Airflow-Supervisor-Schema-Version"])
+        except zipfile.BadZipFile:
+            log.exception("Cannot read JAR; ignored", path=path)
+            return None
+
+
+def _validate_schema_version(instance, _, value) -> str:
+    return get_schema_version_migrator().resolve_version(str(value))
+
+
[email protected]
+class _JarInfo:
+    main_class: str
+    schema_version: str = attrs.field(validator=_validate_schema_version)
+
+    @attrs.define
+    class _Progress:
+        main_class: str | None = attrs.field(init=False, default=None)
+        schema_version: str | None = attrs.field(init=False, default=None)
+
+        def collect(self) -> _JarInfo | None:
+            if self.main_class is None or self.schema_version is None:
+                return None
+            return _JarInfo(self.main_class, self.schema_version)
+
+    @classmethod
+    def find(cls, roots: Sequence[pathlib.Path], main_class: str) -> _JarInfo:
+        progress = cls._Progress()
+        for root in roots:
+            log.debug("Finding required JAR metadata in directory", dir=root)
+            for p in root.iterdir():
+                if p.suffix != ".jar":
+                    continue
+                if (metadata := _JarMetadata.from_jar(p)) is None:
+                    continue
+                if metadata.main_class and ((main_class == 
metadata.main_class) or not main_class):
+                    log.debug("JAR located with Main-Class metadata", path=p, 
main_class=metadata.main_class)
+                    progress.main_class = metadata.main_class
+                if metadata.schema_version:
+                    log.debug(
+                        "JAR located with Airflow-Supervisor-Schema-Version 
metadata",
+                        path=p,
+                        schema_version=metadata.schema_version,
+                    )
+                    progress.schema_version = metadata.schema_version
+                if (result := progress.collect()) is not None:
+                    return result
+        if progress.main_class is not None:
+            tp = "cannot find a JAR with Airflow-Supervisor-Schema-Version 
metadata in {1}"
+        elif main_class:
+            tp = "cannot find a JAR with Main-Class matching {0!r} in {1}"
+        else:
+            tp = "cannot find a JAR with Main-Class metadata in {1}"
+        raise FileNotFoundError(tp.format(main_class, 
os.pathsep.join(os.fspath(p.resolve()) for p in roots)))
+
+
+def _accept_connections(
+    servers: dict[str, socket.socket],
+    drains: dict[str, socket.socket],
+    proc: subprocess.Popen,
+    *,
+    max_wait: float = 10.0,
+    drain_size: int = 4096,
+) -> tuple[dict[socket.socket, socket.socket], dict[socket.socket, bytes]]:
+    """Block until the Java process connects to servers."""
+    accepted: dict[socket.socket, socket.socket] = {}
+    drained: dict[socket.socket, bytes] = {s: b"" for s in drains.values()}
+    with selectors.DefaultSelector() as sel:
+        for key, soc in itertools.chain(servers.items(), drains.items()):
+            sel.register(soc, selectors.EVENT_READ, data=key)
+        deadline = time.monotonic() + max_wait
+        while len(accepted) < len(servers):
+            remaining = deadline - time.monotonic()
+            if remaining <= 0:
+                for s in accepted.values():
+                    s.close()
+                raise TimeoutError("process did not connect within timeout")
+            if proc.poll() is not None:
+                for s in accepted.values():
+                    s.close()
+                raise RuntimeError(f"process exited with {proc.returncode} 
before connecting")
+            for event, _ in sel.select(timeout=min(remaining, 1.0)):
+                soc = cast("socket.socket", event.fileobj)
+                if soc in drained:
+                    log.debug("Draining child process stream", key=event.data)
+                    drained[soc] += soc.recv(drain_size)
+                else:
+                    log.debug("Accepting child process connection", 
key=event.data)
+                    conn, _ = soc.accept()
+                    sel.unregister(soc)
+                    accepted[soc] = conn
+    return accepted, drained
+
+
[email protected]
+class _ResourceTracker:
+    tracked: dict[int, socket.socket | subprocess.Popen] = 
attrs.field(init=False, factory=dict)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *exc_info):
+        for o in self.tracked.values():
+            match o:
+                case socket.socket():
+                    o.close()
+                case subprocess.Popen():
+                    o.terminate()
+
+    def track(self, *objects: Tracked) -> tuple[Tracked, ...]:
+        self.tracked.update((hash(o), o) for o in objects)
+        return objects
+
+    def untrack(self, *objects: Tracked) -> tuple[Tracked, ...]:
+        for o in objects:
+            self.tracked.pop(hash(o), None)
+        return objects
+
+
[email protected](kw_only=True)
+class _JavaActivitySubprocess(ActivitySubprocess):
+    """Java task runner process."""
+
+    _comm_server: socket.socket
+    _logs_server: socket.socket
+
+    @classmethod
+    def start(  # type: ignore[override]
+        cls,
+        *,
+        what: TaskInstanceDTO,
+        dag_rel_path: str | os.PathLike[str],
+        bundle_info,
+        logger: FilteringBoundLogger | None = None,
+        sentry_integration: str = "",
+        java_executable: str,
+        jvm_args: list[str],
+        jars_root: Sequence[pathlib.Path],
+        main_class: str,
+        **kwargs,
+    ) -> Self:
+        jar = _JarInfo.find(jars_root, main_class)
+        with _ResourceTracker() as tracker:
+            stdout_r, stdout_w = tracker.track(*socket.socketpair())
+            stderr_r, stderr_w = tracker.track(*socket.socketpair())
+            comm_server, logs_server = tracker.track(_start_server(), 
_start_server())
+
+            proc = subprocess.Popen(
+                [
+                    java_executable,
+                    "-classpath",
+                    _calculate_classpath(jars_root),
+                    *jvm_args,
+                    jar.main_class,
+                    # Arguments to MainClass...
+                    "--comm={0[0]}:{0[1]}".format(comm_server.getsockname()),
+                    "--logs={0[0]}:{0[1]}".format(logs_server.getsockname()),
+                ],
+                stdout=stdout_w.fileno(),
+                stderr=stderr_w.fileno(),
+            )
+            tracker.track(proc)
+            for soc in tracker.untrack(stdout_w, stderr_w):
+                soc.close()
+            log.info("Starting subprocess", pid=proc.pid)
+
+            socks, drained = _accept_connections(
+                {"comm": comm_server, "logs": logs_server},
+                {"stdout": stdout_r, "stderr": stderr_r},
+                proc,
+            )
+            tracker.track(*socks.values())
+
+            self = cls(
+                id=what.id,
+                pid=proc.pid,
+                process=proc,
+                process_log=logger or 
structlog.get_logger(logger_name="task").bind(),
+                start_time=time.monotonic(),
+                stdin=socks[comm_server],
+                subprocess_schema_version=jar.schema_version,
+                comm_server=comm_server,
+                logs_server=logs_server,
+                **kwargs,
+            )
+            self._register_pipe_readers(
+                *tracker.untrack(stdout_r, stderr_r, socks[comm_server], 
socks[logs_server]),
+                data=drained,
+            )
+            self._on_child_started(
+                ti=what,
+                dag_rel_path=dag_rel_path,
+                bundle_info=bundle_info,
+                sentry_integration=sentry_integration,
+            )
+
+            # Untrack everything left. 'self' keeps track of these and close 
the
+            # servers when the subprocess exits in 'wait'.
+            tracker.untrack(comm_server, logs_server, proc)
+
+        return self
+
+    def wait(self) -> int:
+        code = super().wait()
+        self._close_unused_sockets(self._comm_server, self._logs_server)
+        return code
+
+
+def _convert_jars_root(
+    value: None | os.PathLike[str] | pathlib.Path | list[os.PathLike[str] | 
pathlib.Path],
+) -> list[pathlib.Path]:
+    if value is None:
+        return []
+    if isinstance(value, (str, os.PathLike, pathlib.Path)):
+        return [pathlib.Path(value).expanduser()]
+    return [pathlib.Path(v).expanduser() for v in value]
+
+
[email protected](kw_only=True)
+class JavaCoordinator(BaseCoordinator):
+    """
+    Coordinator that launches a JVM subprocess for DAG parsing and task 
execution.
+
+    Configuration is taken from the ``[sdk] coordinators`` entry that 
constructs
+    this instance::
+
+        {
+            "name": "jdk-17",
+            "classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
+            "kwargs": {
+                "java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java",
+                "jvm_args": ["-Xmx1024m"],
+                "jars_root": ["~/airflow/jars"],
+            },
+        }
+
+    :param java_executable: Path to the ``java`` command (defaults to
+        ``"java"``, which relies on ``$PATH``).
+    :param jvm_args: Extra arguments passed to the JVM (e.g. ``["-Xmx512m"]``).
+    :param jars_root: A list of directories scanned for JAR bundles.
+    :param main_class: Explicit entry point to execute with *java_executable*.
+
+    If *main_class* is not explicitly set, JavaCoordinator scans *jars_root* to
+    find an executable JAR (one with Main-Class set in its metadata). If more
+    than one executable JAR is found, it may be nondeterministic which one ends
+    up being executed.
+
+    A JAR containing metadata *Airflow-Supervisor-Schema-Version* should also 
be
+    available to specify the wire schema version. The JAR containing the Java
+    SDK automatically sets this, so you don't generally need to do anything if
+    dependency JARs are deployed as-is. If you repackage the dependencies,
+    however, you must also reproduce the metadata entry in one of the JARs.
+    """
+
+    java_executable: str = "java"
+    jvm_args: list[str] = attrs.field(factory=list)
+    jars_root: list[pathlib.Path] = attrs.field(converter=_convert_jars_root, 
factory=list)
+    main_class: str = ""

Review Comment:
   `jars_root=None` (or omitted) results in `[]` and the misconfig only 
surfaces when the first task launches, as a generic "cannot find a JAR with 
Main-Class metadata in " error. Please add an attrs validator that requires 
`jars_root` to be a non-empty list of existing directories so the failure 
happens at coordinator construction with a clear message.



##########
task-sdk/src/airflow/sdk/execution_time/coordinator.py:
##########
@@ -0,0 +1,244 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Runtime coordinator for non-Python DAG file processing and task execution.
+
+Provides :class:`BaseCoordinator`, the base class for
+SDK-specific coordinators that bridge subprocess I/O between the
+Airflow supervisor and an external-SDK runtime (Java, Go, Rust, etc.),
+and :class:`CoordinatorManager`, the registry that loads coordinator
+instances from the ``[sdk] coordinators`` configuration.
+
+The coordinator's :meth:`~BaseCoordinator.run_task_execution` handles the full
+lifecycle:
+
+1. Creates TCP servers for comm and logs channels, and a socketpair for stderr.
+2. Calls :meth:`~BaseCoordinator.task_execution_cmd` (provided by the subclass)
+   to obtain the subprocess command.
+3. Spawns the subprocess and accepts TCP connections from it.
+4. Runs a selector-based bridge that transparently forwards bytes
+   between fd 0 (supervisor) and the subprocess comm socket, and
+   re-emits the subprocess's log and stderr output through structlog.
+"""
+
+from __future__ import annotations
+
+import contextlib
+import functools
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pydantic
+import structlog
+
+from airflow.sdk._shared.module_loading import import_string
+from airflow.sdk.configuration import conf
+
+if TYPE_CHECKING:
+    from collections.abc import Mapping
+    from os import PathLike
+
+    from structlog.typing import FilteringBoundLogger
+    from typing_extensions import Self
+
+    from airflow.sdk.api.client import Client
+    from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
+
+__all__ = [
+    "BaseCoordinator",
+    "CoordinatorManager",
+    "get_coordinator_manager",
+    "reset_coordinator_manager",
+]
+
+log = structlog.get_logger(__name__)
+
+
+class BaseCoordinator:
+    """
+    Base coordinator for runtime-specific DAG file processing and task 
execution.
+
+    Coordinators are instantiated from the ``[sdk] coordinators`` configuration
+    (see :class:`CoordinatorManager`) — each entry's ``classpath`` is resolved
+    via :func:`~airflow.sdk._shared.module_loading.import_string` and
+    constructed with the entry's ``kwargs``.
+    """
+
+    @attrs.define(slots=True)
+    class ExecutionResult:
+        """Return value for :meth:`BaseCoordinator.execute_task`."""
+
+        exit_code: Any
+        final_state: str
+
+    def execute_task(
+        self,
+        *,
+        what: TaskInstanceDTO,
+        dag_rel_path: str | PathLike[str],
+        bundle_info,
+        client: Client,
+        logger: FilteringBoundLogger | None = None,
+        sentry_integration: str = "",
+        subprocess_logs_to_stdout: bool,
+        **kwargs,
+    ) -> ExecutionResult:
+        """
+        Start task execution.
+
+        This should execute the task and return a result.
+        """
+        raise NotImplementedError

Review Comment:
   Two small things on this signature: `bundle_info` is missing its type 
annotation (looks like a `BundleInfo` import got dropped), and the trailing 
`**kwargs` silently absorbs typos from subclass authors. Recommend annotating 
`bundle_info` and removing `**kwargs` until there's a concrete forward-compat 
need -- adding new keyword params later is non-breaking.



##########
task-sdk/src/airflow/sdk/execution_time/coordinator.py:
##########
@@ -0,0 +1,244 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Runtime coordinator for non-Python DAG file processing and task execution.
+
+Provides :class:`BaseCoordinator`, the base class for
+SDK-specific coordinators that bridge subprocess I/O between the
+Airflow supervisor and an external-SDK runtime (Java, Go, Rust, etc.),
+and :class:`CoordinatorManager`, the registry that loads coordinator
+instances from the ``[sdk] coordinators`` configuration.
+
+The coordinator's :meth:`~BaseCoordinator.run_task_execution` handles the full
+lifecycle:
+
+1. Creates TCP servers for comm and logs channels, and a socketpair for stderr.
+2. Calls :meth:`~BaseCoordinator.task_execution_cmd` (provided by the subclass)
+   to obtain the subprocess command.
+3. Spawns the subprocess and accepts TCP connections from it.
+4. Runs a selector-based bridge that transparently forwards bytes
+   between fd 0 (supervisor) and the subprocess comm socket, and
+   re-emits the subprocess's log and stderr output through structlog.
+"""
+
+from __future__ import annotations
+
+import contextlib
+import functools
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pydantic
+import structlog
+
+from airflow.sdk._shared.module_loading import import_string
+from airflow.sdk.configuration import conf
+
+if TYPE_CHECKING:
+    from collections.abc import Mapping
+    from os import PathLike
+
+    from structlog.typing import FilteringBoundLogger
+    from typing_extensions import Self
+
+    from airflow.sdk.api.client import Client
+    from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
+
+__all__ = [
+    "BaseCoordinator",
+    "CoordinatorManager",
+    "get_coordinator_manager",
+    "reset_coordinator_manager",
+]
+
+log = structlog.get_logger(__name__)
+
+
+class BaseCoordinator:
+    """
+    Base coordinator for runtime-specific DAG file processing and task 
execution.
+
+    Coordinators are instantiated from the ``[sdk] coordinators`` configuration
+    (see :class:`CoordinatorManager`) — each entry's ``classpath`` is resolved
+    via :func:`~airflow.sdk._shared.module_loading.import_string` and
+    constructed with the entry's ``kwargs``.
+    """
+
+    @attrs.define(slots=True)
+    class ExecutionResult:
+        """Return value for :meth:`BaseCoordinator.execute_task`."""
+
+        exit_code: Any
+        final_state: str
+
+    def execute_task(
+        self,
+        *,
+        what: TaskInstanceDTO,
+        dag_rel_path: str | PathLike[str],
+        bundle_info,
+        client: Client,
+        logger: FilteringBoundLogger | None = None,
+        sentry_integration: str = "",
+        subprocess_logs_to_stdout: bool,
+        **kwargs,
+    ) -> ExecutionResult:
+        """
+        Start task execution.
+
+        This should execute the task and return a result.
+        """
+        raise NotImplementedError
+
+
+class _CoordinatorSpec(pydantic.BaseModel):
+    classpath: str
+    kwargs: dict[str, Any] = pydantic.Field(default_factory=dict)
+
+
+class _PythonCoordinator(BaseCoordinator):
+    """
+    Coordinator implementation to execute Python tasks.
+
+    This is not supposed to be specified by users directly, but the fallback
+    used by default when nothing is specified.
+    """
+
+    def execute_task(
+        self,
+        *,
+        what: TaskInstanceDTO,
+        dag_rel_path: str | PathLike[str],
+        bundle_info,
+        client: Client,
+        logger: FilteringBoundLogger | None = None,
+        sentry_integration: str = "",
+        subprocess_logs_to_stdout: bool,
+        **kwargs,
+    ) -> BaseCoordinator.ExecutionResult:
+        # TODO: Move this to somewhere that makes more sense.
+        from airflow.sdk.execution_time.supervisor import ActivitySubprocess

Review Comment:
   Replace the `TODO` with a concrete reason: this is inline because 
`supervisor.py` imports `get_coordinator_manager` from this module at L133, so 
importing `ActivitySubprocess` at the top of this file would cycle. Documenting 
the *why* keeps a future deslop pass from innocently hoisting it.



##########
task-sdk/src/airflow/sdk/execution_time/coordinator.py:
##########
@@ -0,0 +1,244 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Runtime coordinator for non-Python DAG file processing and task execution.
+
+Provides :class:`BaseCoordinator`, the base class for
+SDK-specific coordinators that bridge subprocess I/O between the
+Airflow supervisor and an external-SDK runtime (Java, Go, Rust, etc.),
+and :class:`CoordinatorManager`, the registry that loads coordinator
+instances from the ``[sdk] coordinators`` configuration.
+
+The coordinator's :meth:`~BaseCoordinator.run_task_execution` handles the full
+lifecycle:
+
+1. Creates TCP servers for comm and logs channels, and a socketpair for stderr.
+2. Calls :meth:`~BaseCoordinator.task_execution_cmd` (provided by the subclass)
+   to obtain the subprocess command.
+3. Spawns the subprocess and accepts TCP connections from it.
+4. Runs a selector-based bridge that transparently forwards bytes
+   between fd 0 (supervisor) and the subprocess comm socket, and
+   re-emits the subprocess's log and stderr output through structlog.
+"""
+
+from __future__ import annotations
+
+import contextlib
+import functools
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pydantic
+import structlog
+
+from airflow.sdk._shared.module_loading import import_string
+from airflow.sdk.configuration import conf
+
+if TYPE_CHECKING:
+    from collections.abc import Mapping
+    from os import PathLike
+
+    from structlog.typing import FilteringBoundLogger
+    from typing_extensions import Self
+
+    from airflow.sdk.api.client import Client
+    from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
+
+__all__ = [
+    "BaseCoordinator",
+    "CoordinatorManager",
+    "get_coordinator_manager",
+    "reset_coordinator_manager",
+]
+
+log = structlog.get_logger(__name__)
+
+
+class BaseCoordinator:
+    """
+    Base coordinator for runtime-specific DAG file processing and task 
execution.
+
+    Coordinators are instantiated from the ``[sdk] coordinators`` configuration
+    (see :class:`CoordinatorManager`) — each entry's ``classpath`` is resolved
+    via :func:`~airflow.sdk._shared.module_loading.import_string` and
+    constructed with the entry's ``kwargs``.
+    """
+
+    @attrs.define(slots=True)
+    class ExecutionResult:
+        """Return value for :meth:`BaseCoordinator.execute_task`."""
+
+        exit_code: Any
+        final_state: str
+
+    def execute_task(
+        self,
+        *,
+        what: TaskInstanceDTO,
+        dag_rel_path: str | PathLike[str],
+        bundle_info,
+        client: Client,
+        logger: FilteringBoundLogger | None = None,
+        sentry_integration: str = "",
+        subprocess_logs_to_stdout: bool,
+        **kwargs,
+    ) -> ExecutionResult:
+        """
+        Start task execution.
+
+        This should execute the task and return a result.
+        """
+        raise NotImplementedError
+
+
+class _CoordinatorSpec(pydantic.BaseModel):
+    classpath: str
+    kwargs: dict[str, Any] = pydantic.Field(default_factory=dict)
+
+
+class _PythonCoordinator(BaseCoordinator):
+    """
+    Coordinator implementation to execute Python tasks.
+
+    This is not supposed to be specified by users directly, but the fallback
+    used by default when nothing is specified.
+    """
+
+    def execute_task(
+        self,
+        *,
+        what: TaskInstanceDTO,
+        dag_rel_path: str | PathLike[str],
+        bundle_info,
+        client: Client,
+        logger: FilteringBoundLogger | None = None,
+        sentry_integration: str = "",
+        subprocess_logs_to_stdout: bool,
+        **kwargs,
+    ) -> BaseCoordinator.ExecutionResult:
+        # TODO: Move this to somewhere that makes more sense.
+        from airflow.sdk.execution_time.supervisor import ActivitySubprocess
+
+        process = ActivitySubprocess.start(
+            dag_rel_path=dag_rel_path,
+            what=what,
+            client=client,
+            logger=logger,
+            bundle_info=bundle_info,
+            subprocess_logs_to_stdout=subprocess_logs_to_stdout,
+            sentry_integration=sentry_integration,
+        )
+        exit_code = process.wait()
+        return self.ExecutionResult(exit_code, process.final_state)
+
+
[email protected]
+def _build_python_coordinator() -> _PythonCoordinator:
+    return _PythonCoordinator()
+
+
+class InvalidCoordinatorError(ValueError):
+    """Raised for an invalid coordinator configuration."""
+
+
[email protected](kw_only=True)
+class CoordinatorManager:
+    """
+    Registry of coordinator instances loaded from ``[sdk]`` configurations.
+
+    The ``[sdk] coordinators`` value is a JSON object keyed by coordinator 
name::
+
+        {
+            "jdk-11": {
+                "classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
+                "kwargs": {"java_executable": "/usr/lib/jvm/jdk-11/bin/java", 
...},
+            }
+        }
+
+    The ``classpath`` is resolved via
+    :func:`~airflow.sdk._shared.module_loading.import_string` (no
+    :class:`ProvidersManager` involvement) and constructed with ``kwargs`` on
+    first use. A coordinator entry that is never looked up incurs no startup
+    cost. At most one coordinator object can be created from each entry.
+
+    The ``[sdk] queue_to_coordinator`` config maps queue names to a key in the
+    object, which lets users reuse existing queue assignments to route tasks to
+    a specific coordinator instance (for example, a ``"legacy-java"`` queue
+    routed to a JDK 11 coordinator, and a ``"modern-java"`` queue routed to a
+    JDK 17 coordinator).
+
+    :meta private:
+    """
+
+    _coordinator_specs: Mapping[str, _CoordinatorSpec]
+    _queue_to_coordinator: Mapping[str, str]
+
+    _created_coordinators: dict[str, BaseCoordinator] = 
attrs.field(init=False, factory=dict)
+
+    @classmethod
+    def from_config(cls) -> Self:
+        """Load coordinator specs from configuration without initialization."""
+        coordinator_specs = {
+            k: _CoordinatorSpec.model_validate(v)
+            for k, v in conf.getjson("sdk", "coordinators", 
fallback={}).items()
+        }
+        queue_to_coordinator = conf.getjson("sdk", "queue_to_coordinator", 
fallback={})
+        for key in queue_to_coordinator.values():
+            if key not in coordinator_specs:
+                raise ValueError(f"[sdk] queue_to_coordinator references 
invalid coordinator key: {key!r}")
+        return cls(coordinator_specs=coordinator_specs, 
queue_to_coordinator=queue_to_coordinator)
+
+    def _find_queue(self, key: str) -> BaseCoordinator:
+        with contextlib.suppress(KeyError):
+            return self._created_coordinators[key]
+        spec = self._coordinator_specs[key]
+        coordinator = self._created_coordinators[key] = 
import_string(spec.classpath)(**spec.kwargs)
+        return coordinator
+
+    def for_queue(self, queue: str) -> BaseCoordinator:
+        """
+        Find the coordinator for *queue*.
+
+        If an entry is not registered, a Python coordinator is returned.
+        """
+        try:
+            key = self._queue_to_coordinator[queue]
+        except KeyError:
+            log.debug("Queue not configured to a coordinator; defaulting to 
Python", queue=queue)
+            return _build_python_coordinator()
+        try:
+            coordinator = self._find_queue(key)
+        except KeyError:
+            raise InvalidCoordinatorError(f"Queue {queue!r} configured to 
nonexistent coordinator")
+        except ImportError:
+            raise InvalidCoordinatorError(f"Cannot import coordinator {key!r}")
+        except TypeError:
+            raise InvalidCoordinatorError(f"Cannot instantiate coordinator 
{key!r}")
+        log.debug("Coordinator found for queue", coordinator=coordinator, 
queue=queue)
+        return coordinator
+
+
[email protected]
+def get_coordinator_manager() -> CoordinatorManager:
+    """Return the process-wide :class:`CoordinatorManager`, loaded from config 
on first use."""
+    return CoordinatorManager.from_config()
+
+
+def reset_coordinator_manager() -> None:
+    """Clear the cached :class:`CoordinatorManager` (test helper)."""
+    get_coordinator_manager.cache_clear()

Review Comment:
   `_find_queue` raises `InvalidCoordinatorError` on the first call when the 
classpath is bad, but doesn't cache the failure, so every subsequent task on 
that queue re-runs `import_string` and re-raises. Cache a sentinel (or the 
exception) in `_created_coordinators[key]` so misconfiguration is paid for 
once, not per-task.



##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -2004,6 +2004,43 @@ workers:
       type: integer
       example: ~
       default: "60"
+sdk:
+  description: Settings for non-Python SDK runtime coordination
+  options:
+    coordinators:
+      description: |
+        JSON object mapping of coordinator keys to coordinator definitions.
+
+        Each value is an object with ``classpath`` and optional ``kwargs``.
+        ``classpath`` is resolved via ``import_string`` and constructed with
+        ``kwargs`` on first use.  Entries are
+        independent instances, so the same ``classpath`` can be configured
+        multiple times under different names with different ``kwargs`` (for
+        example, two ``JavaCoordinator`` instances pinned to different JDK
+        versions).
+      version_added: 3.3.0
+      type: string
+      example: |
+        {
+          "jdk-17": {
+            "classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
+            "kwargs": {"java_executable": 
"/usr/lib/jvm/java-17-openjdk/bin/java", "jvm_args": ["-Xmx1024m"]}
+          }
+        }
+      default: ~
+    queue_to_coordinator:
+      description: |
+        JSON mapping of queue names to a coordinator key from
+        ``[sdk] coordinators``.
+
+        When a task's ``language`` field is not set, this mapping is checked
+        to route the task to a configured coordinator instance based on its
+        queue.  This is useful when queues are used as environment or
+        isolation identifiers (e.g. ``legacy-java``, ``modern-java``).
+      version_added: 3.3.0
+      type: string
+      example: '{"legacy-java": "jdk-11", "modern-java": "jdk-17"}'
+      default: ~

Review Comment:
   The description for `queue_to_coordinator` says "When a task's `language` 
field is not set, this mapping is checked", but there's no `language` field on 
`TaskInstanceDTO` at HEAD. Either land the `language`-routing path in this PR, 
or drop the qualifier -- otherwise the doc references a feature the user can't 
actually use yet.



##########
task-sdk/src/airflow/sdk/execution_time/comms.py:
##########
@@ -98,6 +97,9 @@
     XComSequenceSliceResponse,
 )
 from airflow.sdk.exceptions import ErrorType
+from airflow.sdk.execution_time.workloads.task import (
+    TaskInstanceDTO,  # noqa: TC001 -- Pydantic needs this at runtime
+)

Review Comment:
   Tiny doc nit: the `noqa: TC001` here works, but it's worth a one-line "kept 
at runtime because we don't `model_rebuild()` `StartupDetails`" so a future 
TC001 cleanup pass doesn't innocently move it back under `TYPE_CHECKING`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to