This is an automated email from the ASF dual-hosted git repository.
jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0cb0cb98a9f Add ExecutableCoordinator for native self-contained Dag
bundles (#67161)
0cb0cb98a9f is described below
commit 0cb0cb98a9f7b692aa9348dbf9f0d3c411d61505
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Wed Jun 3 13:43:17 2026 +0800
Add ExecutableCoordinator for native self-contained Dag bundles (#67161)
* Add executable coordinator to task-sdk
Introduces the executable coordinator under
task-sdk/src/airflow/sdk/coordinators/executable along with its tests
and the bundle-spec docs page.
* Make ExecutableCoordinator align with JavaCoordinator
* Make ExecutableCoordinator inherit SocketCoordinator
* Rework ExecutableCoordinator
* Address review comments on ExecutableCoordinator
- Use an attrs converter (not validator) for _Bundle.schema_version so the
canonical version returned by resolve_version is preserved rather than
silently discarded.
- Surface rejection reasons in _Bundle.find: when a bundle matches the
dag_id but is rejected by schema-version resolution, include the path and
reason in the FileNotFoundError so operators see "found but unusable"
instead of "missing".
- Log at debug when bundle metadata fails to decode (UnicodeDecodeError /
yaml.YAMLError) or is not a mapping, matching the trailer-error path.
- Fix bundle-spec wording: schema_version validation is lazy at task-
execution time, not at coordinator start.
* Bring executable-bundle scanner doc in line with implementation
The Deployment Layout section described the scan as flat over regular
files, but the implementation walks subdirectories and requires the
executable bit. Spell out both so producers and operators know what
gets considered.
* Read each bundle through a single open in ExecutableCoordinator
The scanner used to open every candidate file three times — once to read
the trailer, again to hash the binary region on a cache miss, and a
third time to read the metadata region — and each open did its own
stat. That tripled the syscall count on the discovery hot path and
left a TOCTOU window where the trailer-parse stat and the hash stat
could refer to different files if the bundle was atomically replaced
in between.
Open the bundle once per discovery, stat the fd (not the path), and
share that handle across trailer parse, hash (on miss), and metadata
read so all three reference the same inode. The lru_cache wrapper is
replaced with an explicit OrderedDict LRU so the cached-digest lookup
can sit between the trailer read and the hash without forcing a
second open on miss.
* Encapsulate the bundle digest cache as a class
The bare OrderedDict plus a couple of free functions spread the cache
invariants (LRU bookkeeping, size bound) across multiple call sites.
Wrap them in a small _BinaryDigestCache class with a single
process-wide instance so the bound and the eviction policy live in
one place, and add unit tests covering the contracts the surrounding
code relies on (lookup miss, round-trip, update-in-place, eviction
order, LRU promotion on both get and put, clear).
No lock: the supervisor process model is one task per process and
_build_execute_task_command is only called from the main thread, so
the cache cannot be touched concurrently today. The class docstring
records the assumption so anyone introducing multi-threaded task
execution knows to add one.
* CI: Fix spelling
* Guard ExecutableCoordinator scan against symlink cycles
Track visited directories by (st_dev, st_ino) while walking
executables_root so a symlink loop or hardlinked directory cannot
recurse until the interpreter stack is exhausted. Also drop the
test-only _clear_digest_cache helper from the production module and
have the test call _digest_cache.clear() directly.
* fixup: avoid wordlist additions
Switch serialisation/Unrecognised/honour to serialization/Unrecognized/honor
in the bundle spec doc and drop the British spellings this PR had added to
docs/spelling_wordlist.txt, since the American forms need no wordlist entry.
* Fix SubprocessCoordinator reference
* CI: Fix coordinator unit tests
---
docs/spelling_wordlist.txt | 3 +
task-sdk/docs/airflow-metadata.schema.json | 70 +++
task-sdk/docs/executable-bundle-spec.rst | 304 ++++++++++++
task-sdk/docs/index.rst | 1 +
.../sdk/coordinators/executable/__init__.py | 25 +
.../sdk/coordinators/executable/coordinator.py | 395 ++++++++++++++++
.../task_sdk/coordinators/executable/__init__.py | 16 +
.../coordinators/executable/test_coordinator.py | 507 +++++++++++++++++++++
8 files changed, 1321 insertions(+)
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 5d5b82c27bb..0e9f7fe190a 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -502,6 +502,7 @@ doesn
doesnt
dom
Dont
+dotfiles
DownloadReportV
downscaling
downstreams
@@ -554,6 +555,7 @@ enableAutoScale
enablement
encodable
encryptor
+endian
enqueue
enqueued
Entra
@@ -1067,6 +1069,7 @@ msfabric
msg
msgraph
mssql
+mtime
mTLS
multi-cloud
multimodal
diff --git a/task-sdk/docs/airflow-metadata.schema.json
b/task-sdk/docs/airflow-metadata.schema.json
new file mode 100644
index 00000000000..56e11109e26
--- /dev/null
+++ b/task-sdk/docs/airflow-metadata.schema.json
@@ -0,0 +1,70 @@
+{
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "$id":
"https://airflow.apache.org/schemas/sdk-executable/airflow-metadata-1.0.schema.json",
+ "title": "Airflow Executable SDK Bundle Metadata",
+ "description": "Build-time manifest declaring DAG and task identifiers
exposed by an Airflow native-executable SDK bundle. See the Executable Bundle
Spec documentation in the Airflow Task SDK.",
+ "type": "object",
+ "required": ["airflow_bundle_metadata_version", "sdk", "source", "dags"],
+ "additionalProperties": true,
+ "properties": {
+ "airflow_bundle_metadata_version": {
+ "type": "string",
+ "description": "Bundle-spec version this manifest conforms to (currently
'1.0').",
+ "pattern": "^[0-9]+\\.[0-9]+(\\.[0-9]+)?$"
+ },
+ "sdk": {
+ "type": "object",
+ "description": "Identifies the SDK that produced the bundle.",
+ "required": ["language", "version", "supervisor_schema_version"],
+ "additionalProperties": true,
+ "properties": {
+ "language": {
+ "type": "string",
+ "description": "Lower-case source-language identifier (e.g. 'go',
'rust', 'cpp', 'zig').",
+ "pattern": "^[a-z][a-z0-9_+.\\-]*$"
+ },
+ "version": {
+ "type": "string",
+ "description": "SDK version used at build time.",
+ "minLength": 1
+ },
+ "supervisor_schema_version": {
+ "type": "string",
+ "description": "Dated supervisor wire-schema version the bundle was
compiled against (e.g. '2026-06-16'). The coordinator passes this value to the
supervisor so it can downgrade outbound messages / upgrade inbound messages to
a shape the bundle understands.",
+ "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
+ }
+ }
+ },
+ "source": {
+ "type": "string",
+ "description": "Original filename of the primary DAG source file (e.g.
'example.go'). The file's bytes are embedded in the bundle's source region;
this field is a display name used by the Airflow UI.",
+ "minLength": 1
+ },
+ "dags": {
+ "type": "object",
+ "description": "Mapping of dag_id to DAG entry. Every dag_id the bundle
exposes must appear here.",
+ "minProperties": 1,
+ "additionalProperties": {
+ "$ref": "#/$defs/dagEntry"
+ }
+ }
+ },
+ "$defs": {
+ "dagEntry": {
+ "type": "object",
+ "description": "Static description of a single DAG declared in the
bundle.",
+ "required": ["tasks"],
+ "additionalProperties": true,
+ "properties": {
+ "tasks": {
+ "type": "array",
+ "description": "Static list of task_ids declared in the DAG.",
+ "items": {
+ "type": "string",
+ "minLength": 1
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/task-sdk/docs/executable-bundle-spec.rst
b/task-sdk/docs/executable-bundle-spec.rst
new file mode 100644
index 00000000000..28f54069454
--- /dev/null
+++ b/task-sdk/docs/executable-bundle-spec.rst
@@ -0,0 +1,304 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Executable Bundle Spec
+======================
+
+This document specifies the on-disk format of a build artifact produced by an
+Airflow native-executable SDK (Go, Rust, C++, Zig, ...) and consumed by
+:class:`~airflow.sdk.coordinators.executable.ExecutableCoordinator`
+at deployment time.
+
+The goal is a single, language-agnostic *bundle* shape so that scheduler,
+worker, and UI behave identically regardless of which compiled SDK produced
+the DAG.
+
+Bundle-spec version: ``1.0``.
+
+Container
+---------
+
+A bundle is **the compiled executable itself, with a fixed-format footer
+appended after the binary's normal end-of-file**. The executable remains
+directly runnable; the footer is data that follows the last byte the OS
+loader cares about and is invisible to ``exec()``. There is no enclosing
+archive.
+
+A bundle file therefore has three regions, in order from offset 0:
+
+1. The native executable (ELF / Mach-O / PE), including any code-signing
+ structures the platform appends.
+2. The primary DAG source file, embedded verbatim (UTF-8). MAY have length 0.
+3. The build-time manifest (``airflow-metadata.yaml`` content, UTF-8).
+
+The file ends with a fixed 64-byte trailer that locates regions (2) and (3),
+carries an integrity hash of the binary region, and identifies the file as a
+bundle. See :ref:`bundle-trailer-layout`.
+
+Filenames follow OS conventions for executables: no extension on Linux/macOS,
+``.exe`` on Windows. The scanner identifies bundles by the trailer's magic,
+not by the filename.
+
+.. _bundle-trailer-layout:
+
+Trailer Layout
+--------------
+
+The last 64 bytes of a conforming bundle are the trailer. All multi-byte
+integers are little-endian.
+
+::
+
+ bytes 0..3 source_len uint32 length of the source region in
bytes
+ bytes 4..7 metadata_len uint32 length of the metadata region in
bytes
+ bytes 8..11 footer_ver uint32 currently 1
+ bytes 12..43 binary_sha256 32 bytes SHA-256 of the binary region
+ bytes 44..55 reserved 12 bytes MUST be zero
+ bytes 56..63 magic 8 bytes ASCII "AFBNDL01"
+
+The magic is the byte sequence ``0x41 0x46 0x42 0x4E 0x44 0x4C 0x30 0x31``
+(``"AFBNDL01"``). The trailing ``01`` is the footer-format version repeated
+in ASCII so a human can identify a bundle at a glance
+(``tail -c 8 ./mybundle | xxd``); the binary ``footer_ver`` field is the
+authoritative source of truth for parsing.
+
+``binary_sha256`` is the SHA-256 digest computed over the **binary region
+only** — bytes ``[0, source_start)``. The hash field sits inside the trailer
+and therefore cannot cover the bytes it occupies; it provides *integrity*
+(the binary region has not been truncated, corrupted, or naively edited
+between packing and exec) rather than *authenticity*
+(see :ref:`bundle-code-signing` for how authenticity layers on top).
+
+Reader algorithm:
+
+1. Open the file. Seek to ``EOF - 64``. Read 64 bytes.
+2. Compare bytes ``56..63`` against ``"AFBNDL01"``. If different, the file
+ is not a bundle; the scanner MUST ignore it.
+3. Parse ``footer_ver``. If unknown, fail with a versioning error.
+4. Compute ``metadata_start = filesize - 64 - metadata_len`` and
+ ``source_start = metadata_start - source_len``.
+5. Validate ``source_start >= 0`` and that the implied binary region
+ (``[0, source_start)``) is non-empty.
+6. Compute SHA-256 over the binary region ``[0, source_start)`` and compare
+ to ``binary_sha256``. Mismatch is a hard failure handled identically to
+ a magic-check failure: the scanner logs and skips the file. The result
+ MAY be cached by ``(path, inode, mtime, size)`` so the runtime does not
+ re-hash on every exec; a cache miss (file replaced, mtime bumped)
+ triggers re-verification.
+7. Read ``metadata_len`` bytes from ``metadata_start`` for the manifest.
+8. Read ``source_len`` bytes from ``source_start`` for the source view.
+ If ``source_len == 0``, no source is embedded; the UI displays
+ "(source not available)".
+
+Source comes *before* metadata so a future ``footer_ver`` MAY introduce
+additional trailing blobs (e.g. signed checksums, compressed deps) by
+extending the trailer rather than inserting between existing blobs.
+
+.. _bundle-metadata-schema:
+
+``airflow-metadata.yaml`` schema
+--------------------------------
+
+The metadata region carries the same YAML manifest documented previously,
+produced at build time from a static scan of the DAG source. A
+machine-readable JSON Schema is published at
+:download:`airflow-metadata.schema.json` for use by build tooling, validators,
+and editors.
+
+.. code-block:: yaml
+
+ airflow_bundle_metadata_version: "1.0"
+ sdk:
+ language: go
+ version: "0.1.0"
+ supervisor_schema_version: "2026-06-16"
+ source: example.go
+ dags:
+ example_dag:
+ tasks:
+ - extract
+ - transform
+ - load
+ another_dag:
+ tasks:
+ - run
+
+Top-level keys:
+
+``airflow_bundle_metadata_version`` (string, required)
+ The bundle-spec version this manifest conforms to. Currently ``"1.0"``.
+
+``sdk`` (mapping, required)
+ Identifies the SDK that produced the bundle.
+
+ - ``language`` (string, required): lower-case source-language identifier
+ (e.g. ``go``, ``rust``, ``cpp``, ``zig``).
+ - ``version`` (string, required): SDK version used at build time.
+ - ``supervisor_schema_version`` (string, required): dated AIP-72
+ supervisor wire-schema version the bundle was compiled against, in
+ ``YYYY-MM-DD`` format (e.g. ``"2026-06-16"``). The coordinator passes
+ this value to the supervisor so it can downgrade outbound messages /
+ upgrade inbound messages to a shape the bundle understands. The value
+ MUST resolve against the supervisor's schema bundle; the coordinator
+ validates it lazily when matching a bundle to a task at
+ task-execution time, and an unknown version causes that bundle to be
+ skipped.
+
+``source`` (string, required)
+ Original filename of the primary DAG source file (e.g. ``example.go``).
+ The file's bytes live in the source region of the bundle, not at this
+ path; this field is a display name the Airflow UI uses to label the
+ source-view panel and pick a syntax-highlighting mode from the
+ extension.
+
+``dags`` (mapping, required)
+ Mapping of ``dag_id`` to a *DAG entry*. Every ``dag_id`` the bundle
+ exposes MUST appear here. The scanner uses these keys to match a DAG
+ parsing or task-execution request to the bundle that owns it.
+
+DAG entry fields:
+
+``tasks`` (list of strings, required)
+ Static list of ``task_id``\ s declared in the DAG. Empty lists are
+ permitted but discouraged.
+
+Unrecognized top-level or DAG-entry keys MUST be ignored by the consumer so
+that future SDK versions can extend the manifest without breaking older
+runtimes.
+
+Examples
+--------
+
+Go bundle::
+
+ example
+ ├── ELF/Mach-O/PE executable
+ ├── source region: contents of example.go
+ ├── metadata region: airflow-metadata.yaml (source: example.go)
+ └── trailer (64 B): lengths + binary_sha256 + AFBNDL01 magic
+
+Rust bundle::
+
+ pipeline
+ ├── ELF/Mach-O/PE executable
+ ├── source region: contents of main.rs
+ ├── metadata region: airflow-metadata.yaml (source: main.rs)
+ └── trailer (64 B): lengths + binary_sha256 + AFBNDL01 magic
+
+The bundle is one file. ``./example`` runs the binary; the appended data
+is invisible to ``exec()``.
+
+Build Pipeline Ordering
+-----------------------
+
+The footer is appended after the executable is otherwise complete. Producers
+that perform additional post-build steps MUST observe the following order:
+
+- **Strip** debug symbols *before* appending the footer. Strip
+ implementations operate on the binary's defined end and either leave
+ trailing data intact or truncate it; do not rely on either behaviour.
+- **Compute binary_sha256** over the on-disk bytes *as they stand
+ immediately before the append*. At that moment the whole file is the
+ binary region; nothing has been written past its OS-defined end yet, so
+ the digest matches what the reader will recompute over
+ ``[0, source_start)`` after the append.
+- **Append** ``<source><metadata><trailer>`` in a single write so a
+ partially written file fails the magic or hash check rather than
+ appearing as a half-valid bundle.
+
+.. _bundle-code-signing:
+
+Code Signing
+~~~~~~~~~~~~
+
+The bundle format itself does not require OS-level code signing.
+``binary_sha256`` provides integrity against truncation, in-flight
+corruption, and naive tampering, and Airflow's threat model treats
+``executables_root`` as Deployment-Manager-controlled — *authenticity*
+(signed by a trusted identity) is a deployment-time concern rather than a
+bundle-format one.
+
+**Compressors** such as UPX are NOT supported. They rewrite the file
+end-to-end, destroying both the trailer and the hash invariant.
+
+Determinism: the trailer is byte-identical for byte-identical inputs, so a
+deterministic build plus a canonical (sorted-key) manifest serialization
+yields a byte-identical bundle file (and therefore a stable
+``binary_sha256``).
+
+Deployment Layout
+-----------------
+
+Bundle files are placed **as-is** in any of the directories configured as the
+``executables_root`` kwarg on the
+:class:`~airflow.sdk.coordinators.executable.ExecutableCoordinator` entry
+under ``[sdk] coordinators``. The scanner walks each directory **recursively**
+and considers only regular files whose **executable bit is set** for the
+invoking user; files without the executable bit are skipped without reading
+their trailer. For each candidate it reads the last 64 bytes and treats files
+whose magic matches ``"AFBNDL01"`` as bundles. Matched files are then
+SHA-256-verified per the reader algorithm; a mismatch demotes the file back
+to "ignored, with an error log." Files without the magic are silently
+ignored, so non-bundle files (READMEs, dotfiles) MAY share the directory
+without interfering with the scan.
+
+::
+
+ /opt/airflow/executable-bundles/
+ ├── example
+ ├── team-a/
+ │ └── pipeline
+ └── analytics
+
+At task-execution time the runtime execs the bundle file directly with the
+coordinator arguments (``--comm=<addr>`` / ``--logs=<addr>``). No extraction,
+no transient cache directory, no chmod-after-extract step is required: the
+file is already a runnable executable with the appropriate permission bits
+preserved by the build pipeline. The integrity check runs at scan/discovery
+time and is cached by ``(path, inode, mtime, size)``, so the exec hot path
+does not re-hash.
+
+The compiled executable MUST honor the SDK coordinator protocol —
+``--comm=<host:port>`` / ``--logs=<host:port>`` socket-based IPC.
+
+See :class:`~airflow.sdk.coordinators.executable.ExecutableCoordinator`
+for the consumer-side coordinator.
+
+Inspection
+----------
+
+Because the bundle is a single executable rather than an archive,
+inspecting the embedded source and manifest requires a small CLI rather
+than an off-the-shelf ``unzip``. The Go SDK's ``airflow-go-pack`` tool
+provides an ``inspect`` subcommand that dumps both regions; equivalent
+helpers are expected from each language's packer.
+
+Compatibility and Versioning
+----------------------------
+
+- The current bundle-spec format version is ``1.0``
+ (``airflow_bundle_metadata_version``); the current trailer format version is
+ ``1`` (``footer_ver = 1``).
+- Backward-incompatible bundle-spec changes increment the major component
+ of ``airflow_bundle_metadata_version`` and are gated behind an explicit
opt-in
+ on the consumer side.
+- New optional manifest fields MAY be added in minor versions and MUST be
+ ignored by older consumers.
+- New trailer-format versions append fields after ``binary_sha256``
+ (consuming the reserved region) or extend the trailer with additional
+ trailing blobs ahead of the magic. Older readers MUST reject unknown
+ ``footer_ver`` rather than guessing.
diff --git a/task-sdk/docs/index.rst b/task-sdk/docs/index.rst
index d1b26544c88..7e3467c14ff 100644
--- a/task-sdk/docs/index.rst
+++ b/task-sdk/docs/index.rst
@@ -175,3 +175,4 @@ For the full public API reference, see the :doc:`api` page.
deferred-vs-async-operators
api
concepts
+ executable-bundle-spec
diff --git a/task-sdk/src/airflow/sdk/coordinators/executable/__init__.py
b/task-sdk/src/airflow/sdk/coordinators/executable/__init__.py
new file mode 100644
index 00000000000..f3bdcc9bfd2
--- /dev/null
+++ b/task-sdk/src/airflow/sdk/coordinators/executable/__init__.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Native executable runtime coordinator for the Apache Airflow Task SDK."""
+
+from __future__ import annotations
+
+from airflow.sdk.coordinators.executable.coordinator import
ExecutableCoordinator
+
+__all__ = ["ExecutableCoordinator", "__version__"]
+
+__version__ = "0.1.0"
diff --git a/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py
b/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py
new file mode 100644
index 00000000000..7b5290be021
--- /dev/null
+++ b/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py
@@ -0,0 +1,395 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Native executable coordinator that launches a binary subprocess for task
execution."""
+
+from __future__ import annotations
+
+import hashlib
+import os
+import pathlib
+import stat
+import struct
+from collections import OrderedDict
+from typing import TYPE_CHECKING, Any, BinaryIO
+
+import attrs
+import structlog
+import yaml
+
+from airflow.sdk.coordinators._subprocess import SubprocessCoordinator
+from airflow.sdk.execution_time.schema import get_schema_version_migrator
+
+if TYPE_CHECKING:
+ from collections.abc import Iterable, Iterator, Sequence
+
+ from structlog.typing import FilteringBoundLogger
+ from typing_extensions import Self
+
+ from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
+
+log: FilteringBoundLogger =
structlog.get_logger(logger_name="coordinators.executable")
+
+
+FOOTER_MAGIC = b"AFBNDL01"
+FOOTER_SIZE = 64
+FOOTER_VERSION = 1
+_HASH_READ_CHUNK = 1 << 20
+# Upper bound on the verification cache.
+_VERIFY_CACHE_MAXSIZE = 256
+
+
[email protected]
+class _Footer:
+ """
+ Parsed bundle trailer plus the byte offsets it implies.
+
+ All region offsets (``source_start``, ``metadata_start``) and the file
+ size at parse time are computed once in :meth:`read` so downstream
+ consumers do not re-derive them.
+ """
+
+ path: pathlib.Path
+ file_size: int
+ source_len: int
+ metadata_len: int
+ footer_ver: int
+ binary_sha256: bytes
+ source_start: int
+ metadata_start: int
+
+ @classmethod
+ def read(cls, f: BinaryIO, path: pathlib.Path, file_size: int) -> Self |
None:
+ """
+ Parse the trailer from an already-open binary handle on *path*.
+
+ *file_size* MUST come from a stat of the same fd so the trailer
+ offsets refer to the file currently held open (not whatever the
+ path resolves to at some later moment).
+
+ Returns ``None`` only when the file is provably not a bundle
+ (smaller than the trailer, or the magic does not match).
+ """
+ if file_size < FOOTER_SIZE:
+ return None
+
+ f.seek(file_size - FOOTER_SIZE)
+ trailer = f.read(FOOTER_SIZE)
+
+ if len(trailer) != FOOTER_SIZE or trailer[56:64] != FOOTER_MAGIC:
+ return None
+
+ source_len, metadata_len, footer_ver = struct.unpack_from("<III",
trailer, 0)
+ if footer_ver != FOOTER_VERSION:
+ raise ValueError(
+ f"Unsupported bundle footer_ver={footer_ver} in {path}; "
+ f"this runtime supports footer_ver={FOOTER_VERSION}."
+ )
+
+ binary_sha256 = bytes(trailer[12:44])
+ reserved = trailer[44:56]
+ if reserved != b"\x00" * 12:
+ raise ValueError(f"Bundle trailer in {path} has non-zero reserved
bytes.")
+
+ metadata_start = file_size - FOOTER_SIZE - metadata_len
+ source_start = metadata_start - source_len
+ if source_start < 0:
+ raise ValueError(f"Bundle trailer in {path} declares regions that
extend past the start of file.")
+ # Per the spec, the binary region [0, source_start) MUST be non-empty.
+ if source_start == 0:
+ raise ValueError(f"Bundle trailer in {path} leaves no room for the
executable region.")
+
+ return cls(
+ path=path,
+ file_size=file_size,
+ source_len=source_len,
+ metadata_len=metadata_len,
+ footer_ver=footer_ver,
+ binary_sha256=binary_sha256,
+ source_start=source_start,
+ metadata_start=metadata_start,
+ )
+
+
+def _hash_open_file(f: BinaryIO, length: int, path: pathlib.Path) -> bytes:
+ """Compute SHA-256 over the first *length* bytes of *f* (seeks to 0
first)."""
+ f.seek(0)
+ digest = hashlib.sha256()
+ remaining = length
+ while remaining > 0:
+ chunk = f.read(min(_HASH_READ_CHUNK, remaining))
+ if not chunk:
+ raise ValueError(
+ f"Bundle {path} truncated while hashing binary region "
+ f"(expected {length} bytes, got {length - remaining})."
+ )
+ digest.update(chunk)
+ remaining -= len(chunk)
+ return digest.digest()
+
+
+_DigestKey = tuple[str, int, int, int, int]
+
+
+class _BinaryDigestCache:
+ """
+ Bounded LRU cache of bundle binary-region digests.
+
+ Entries are keyed by ``(path, source_start, st_ino, st_mtime_ns,
+ st_size)``; a hit means the file at *path* still has the same
+ identity as when we last hashed it. The bound prevents a
+ long-running supervisor that sees many bundle redeploys from
+ retaining every historical ``(ino, mtime_ns)`` tuple forever.
+ """
+
+ def __init__(self, maxsize: int) -> None:
+ self._maxsize = maxsize
+ self._entries: OrderedDict[_DigestKey, bytes] = OrderedDict()
+
+ def get(self, key: _DigestKey) -> bytes | None:
+ digest = self._entries.get(key)
+ if digest is not None:
+ self._entries.move_to_end(key)
+ return digest
+
+ def put(self, key: _DigestKey, digest: bytes) -> None:
+ self._entries[key] = digest
+ self._entries.move_to_end(key)
+ while len(self._entries) > self._maxsize:
+ self._entries.popitem(last=False)
+
+ def clear(self) -> None:
+ self._entries.clear()
+
+
+# Single process-wide instance. A cache miss (file replaced, mtime
+# bumped, inode swapped under us) yields a different key and forces
+# re-verification.
+_digest_cache = _BinaryDigestCache(maxsize=_VERIFY_CACHE_MAXSIZE)
+
+
+def _read_bundle_metadata(path: pathlib.Path) -> dict[str, Any] | None:
+ # One open per bundle: trailer-parse, hash (on cache miss), and
+ # metadata-read all share the same fd, and the stat that keys the
+ # digest cache comes from that fd too. This both halves the syscall
+ # cost of the hot path and removes the trailer-vs-hash TOCTOU window
+ # where a path could be swapped between separate opens.
+ try:
+ f = open(path, "rb")
+ except OSError as exc:
+ log.debug("Cannot open bundle file; skipping", path=str(path),
error=str(exc))
+ return None
+
+ with f:
+ try:
+ st = os.fstat(f.fileno())
+ except OSError as exc:
+ log.debug("Cannot stat bundle file; skipping", path=str(path),
error=str(exc))
+ return None
+
+ try:
+ footer = _Footer.read(f, path, st.st_size)
+ except (OSError, ValueError) as exc:
+ log.debug("Invalid bundle trailer; skipping", path=str(path),
error=str(exc))
+ return None
+ if footer is None:
+ return None
+
+ cache_key: _DigestKey = (str(path), footer.source_start, st.st_ino,
st.st_mtime_ns, st.st_size)
+ actual_digest = _digest_cache.get(cache_key)
+ if actual_digest is None:
+ try:
+ actual_digest = _hash_open_file(f, footer.source_start, path)
+ except (OSError, ValueError) as exc:
+ log.debug("Failed to hash bundle binary region",
path=str(path), error=str(exc))
+ return None
+ _digest_cache.put(cache_key, actual_digest)
+
+ if actual_digest != footer.binary_sha256:
+ log.debug(
+ "Bundle binary_sha256 mismatch; skipping",
+ path=str(path),
+ expected=footer.binary_sha256.hex(),
+ actual=actual_digest.hex(),
+ )
+ return None
+
+ try:
+ f.seek(footer.metadata_start)
+ metadata_bytes = f.read(footer.metadata_len)
+ except OSError as exc:
+ log.debug("Cannot read bundle metadata; skipping", path=str(path),
error=str(exc))
+ return None
+
+ try:
+ data = yaml.safe_load(metadata_bytes.decode("utf-8"))
+ except (UnicodeDecodeError, yaml.YAMLError) as exc:
+ log.debug("Cannot decode bundle metadata; skipping", path=str(path),
error=str(exc))
+ return None
+
+ if not isinstance(data, dict):
+ log.debug(
+ "Bundle metadata is not a mapping; skipping",
+ path=str(path),
+ type=type(data).__name__,
+ )
+ return None
+
+ return data
+
+
+def _dag_ids(metadata: dict[str, Any]) -> set[str]:
+ dags = metadata.get("dags")
+ if not isinstance(dags, dict):
+ return set()
+
+ return set(dags.keys())
+
+
+def _supervisor_schema_version(metadata: dict[str, Any]) -> str | None:
+ sdk = metadata.get("sdk")
+ if not isinstance(sdk, dict):
+ return None
+
+ value = sdk.get("supervisor_schema_version")
+ if not isinstance(value, str) or not value:
+ return None
+
+ return value
+
+
+def _find_executables(items: Iterable[pathlib.Path]) -> Iterator[pathlib.Path]:
+ """
+ Yield executable regular files under *items*, descending into directories.
+
+ A symlink loop or a directory that hardlinks into one of its ancestors
+ would otherwise recurse until the interpreter stack is exhausted, so
+ directories are deduplicated by ``(st_dev, st_ino)`` for the duration
+ of a single scan.
+ """
+ seen_dirs: set[tuple[int, int]] = set()
+ yield from _walk_executables(items, seen_dirs)
+
+
+def _walk_executables(
+ items: Iterable[pathlib.Path], seen_dirs: set[tuple[int, int]]
+) -> Iterator[pathlib.Path]:
+ for item in items:
+ try:
+ st = item.stat()
+ except OSError:
+ continue
+ if stat.S_ISDIR(st.st_mode):
+ key = (st.st_dev, st.st_ino)
+ if key in seen_dirs:
+ log.debug("Skipping already-visited directory", path=str(item))
+ continue
+ seen_dirs.add(key)
+ try:
+ children = list(item.iterdir())
+ except OSError:
+ continue
+ yield from _walk_executables(children, seen_dirs)
+ elif stat.S_ISREG(st.st_mode) and os.access(item, os.X_OK):
+ yield item
+
+
+def _validate_schema_version(instance, _, value) -> str:
+ return get_schema_version_migrator().resolve_version(str(value))
+
+
[email protected]
+class _Bundle:
+ path: pathlib.Path
+ schema_version: str = attrs.field(validator=_validate_schema_version)
+
+ @classmethod
+ def find(cls, executables_root: Sequence[pathlib.Path], dag_id: str) ->
Self:
+ log.debug("Finding executable bundles recursively",
roots=executables_root)
+ rejected: list[tuple[pathlib.Path, str]] = []
+ for p in _find_executables(executables_root):
+ if (metadata := _read_bundle_metadata(p)) is None:
+ continue
+ if dag_id not in _dag_ids(metadata):
+ continue
+
+ try:
+ if (schema_version := _supervisor_schema_version(metadata)) is
None:
+ reason = "missing or invalid sdk.supervisor_schema_version"
+ log.debug("Bundle metadata rejected; skipping",
path=str(p), error=reason)
+ rejected.append((p.resolve(), reason))
+ continue
+ return cls(path=p.resolve(), schema_version=schema_version)
+ except (TypeError, ValueError) as exc:
+ log.debug("Bundle metadata rejected; skipping", path=str(p),
error=str(exc))
+ rejected.append((p.resolve(), str(exc)))
+ continue
+
+ resolved_paths = os.pathsep.join(str(r.resolve()) for r in
executables_root)
+ if rejected:
+ details = "; ".join(f"{path}: {reason}" for path, reason in
rejected)
+ tp = (
+ "cannot find executable bundle with usable
supervisor_schema_version "
+ "for dag_id={0!r} in {1}: matching bundles were rejected ({2})"
+ )
+ else:
+ tp = "cannot find executable bundle containing dag_id={0!r} in {1}"
+ details = ""
+ raise FileNotFoundError(tp.format(dag_id, resolved_paths, details))
+
+
+def _convert_executables_root(
+ value: None | os.PathLike[str] | pathlib.Path | list[os.PathLike[str] |
pathlib.Path],
+) -> list[pathlib.Path]:
+ if value is None:
+ return []
+ if isinstance(value, (str, os.PathLike, pathlib.Path)):
+ return [pathlib.Path(value).expanduser()]
+ return [pathlib.Path(v).expanduser() for v in value]
+
+
[email protected](kw_only=True)
+class ExecutableCoordinator(SubprocessCoordinator):
+ """
+ Coordinator that launches a native executable subprocess for task
execution.
+
+ Configuration is taken from the ``[sdk] coordinators`` entry that
constructs
+ this instance::
+
+ {
+ "name": "go",
+ "classpath":
"airflow.sdk.coordinators.executable.ExecutableCoordinator",
+ "kwargs": {
+ "executables_root": ["~/airflow/executable-bundles"],
+ },
+ }
+
+ :param executables_root: A list of directories scanned for executable
+ bundles when a Python stub DAG delegates task execution to a native
+ runtime.
+ :param task_startup_timeout: Maximum time the coordinator waits for a task
+ process to start, in seconds. The default is 10 seconds.
+ """
+
+ executables_root: list[pathlib.Path] = attrs.field(
+ converter=_convert_executables_root,
+ validator=attrs.validators.min_len(1),
+ )
+
+ def _build_execute_task_command(self, *, what: TaskInstanceDTO) ->
tuple[list[str], str | None]:
+ bundle = _Bundle.find(self.executables_root, what.dag_id)
+ return [str(bundle.path)], bundle.schema_version
diff --git a/task-sdk/tests/task_sdk/coordinators/executable/__init__.py
b/task-sdk/tests/task_sdk/coordinators/executable/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/task-sdk/tests/task_sdk/coordinators/executable/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/task-sdk/tests/task_sdk/coordinators/executable/test_coordinator.py
b/task-sdk/tests/task_sdk/coordinators/executable/test_coordinator.py
new file mode 100644
index 00000000000..23b45e4f144
--- /dev/null
+++ b/task-sdk/tests/task_sdk/coordinators/executable/test_coordinator.py
@@ -0,0 +1,507 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import hashlib
+import pathlib
+import socket
+import stat
+import struct
+import subprocess
+from pathlib import Path
+from unittest import mock
+from unittest.mock import MagicMock, patch
+
+import pytest
+import yaml
+from uuid6 import uuid7
+
+from airflow.sdk.coordinators.executable.coordinator import (
+ FOOTER_MAGIC,
+ FOOTER_SIZE,
+ ExecutableCoordinator,
+ _BinaryDigestCache,
+ _Bundle,
+ _digest_cache,
+)
+from airflow.sdk.execution_time.coordinator import BaseCoordinator
+from airflow.sdk.execution_time.supervisor import ActivitySubprocess
+from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
+
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_3_PLUS
+
+if not AIRFLOW_V_3_3_PLUS:
+ pytest.skip("Coordinator is only compatible with Airflow >= 3.3.0",
allow_module_level=True)
+
+_DEFAULT_BINARY_PAYLOAD = b"\x7fELF" + b"binary-stub-payload"
+
+
+def _make_metadata(dag_ids, source_filename: str = "example.go") -> dict:
+ return {
+ "airflow_bundle_metadata_version": "1.0",
+ "sdk": {
+ "language": "go",
+ "version": "0.1.0",
+ "supervisor_schema_version": "2026-06-16",
+ },
+ "source": source_filename,
+ "dags": {dag_id: {"tasks": ["task1"]} for dag_id in dag_ids},
+ }
+
+
+def _build_bundle(
+ path: Path,
+ *,
+ dag_ids=("tutorial_dag",),
+ source: str | bytes = "package main\n\nfunc main() {}\n",
+ source_filename: str = "example.go",
+ metadata: dict | bytes | None = None,
+ binary_bytes: bytes = _DEFAULT_BINARY_PAYLOAD,
+ footer_ver: int = 1,
+ magic: bytes = FOOTER_MAGIC,
+ reserved: bytes = b"\x00" * 12,
+ binary_sha256: bytes | None = None,
+) -> Path:
+ if isinstance(source, str):
+ source_bytes = source.encode("utf-8")
+ else:
+ source_bytes = source
+
+ if metadata is None:
+ metadata_dict = _make_metadata(dag_ids,
source_filename=source_filename)
+ metadata_bytes = yaml.safe_dump(metadata_dict,
sort_keys=True).encode("utf-8")
+ elif isinstance(metadata, (bytes, bytearray)):
+ metadata_bytes = bytes(metadata)
+ else:
+ metadata_bytes = yaml.safe_dump(metadata,
sort_keys=True).encode("utf-8")
+
+ if len(reserved) != 12:
+ raise ValueError("reserved must be exactly 12 bytes")
+ digest = binary_sha256 if binary_sha256 is not None else
hashlib.sha256(binary_bytes).digest()
+ if len(digest) != 32:
+ raise ValueError("binary_sha256 must be exactly 32 bytes")
+ trailer = (
+ struct.pack("<III", len(source_bytes), len(metadata_bytes),
footer_ver) + digest + reserved + magic
+ )
+ assert len(trailer) == FOOTER_SIZE
+
+ path.write_bytes(binary_bytes + source_bytes + metadata_bytes + trailer)
+ path.chmod(path.stat().st_mode | stat.S_IEXEC | stat.S_IXGRP |
stat.S_IXOTH)
+ return path
+
+
+def _make_executable(path: Path) -> Path:
+ path.write_bytes(b"#!/bin/sh\nexit 0\n")
+ path.chmod(path.stat().st_mode | stat.S_IEXEC | stat.S_IXGRP |
stat.S_IXOTH)
+ return path
+
+
+def _make_ti(dag_id: str = "tutorial_dag", queue: str = "executable") ->
TaskInstanceDTO:
+ return TaskInstanceDTO(
+ id=uuid7(),
+ dag_version_id=uuid7(),
+ task_id="task_1",
+ dag_id=dag_id,
+ run_id="run_1",
+ try_number=1,
+ map_index=-1,
+ pool_slots=1,
+ queue=queue,
+ priority_weight=1,
+ )
+
+
+class TestBinaryDigestCache:
+ def test_get_returns_none_for_missing_key(self):
+ cache = _BinaryDigestCache(maxsize=4)
+ assert cache.get(("/p", 0, 0, 0, 0)) is None
+
+ def test_put_then_get_returns_stored_digest(self):
+ cache = _BinaryDigestCache(maxsize=4)
+ key = ("/p", 100, 1, 2, 3)
+ cache.put(key, b"\xaa" * 32)
+ assert cache.get(key) == b"\xaa" * 32
+
+ def test_put_updates_existing_key(self):
+ cache = _BinaryDigestCache(maxsize=4)
+ key = ("/p", 100, 1, 2, 3)
+ cache.put(key, b"\xaa" * 32)
+ cache.put(key, b"\xbb" * 32)
+ assert cache.get(key) == b"\xbb" * 32
+
+ def test_eviction_drops_oldest_when_over_maxsize(self):
+ cache = _BinaryDigestCache(maxsize=2)
+ keys = [(f"/p{i}", i, 0, 0, 0) for i in range(3)]
+ for i, k in enumerate(keys):
+ cache.put(k, bytes([i]) * 32)
+
+ # First inserted entry should have been evicted.
+ assert cache.get(keys[0]) is None
+ assert cache.get(keys[1]) == bytes([1]) * 32
+ assert cache.get(keys[2]) == bytes([2]) * 32
+
+ def test_get_promotes_entry_so_it_is_not_evicted_next(self):
+ cache = _BinaryDigestCache(maxsize=2)
+ key_a = ("/a", 0, 0, 0, 0)
+ key_b = ("/b", 0, 0, 0, 0)
+ key_c = ("/c", 0, 0, 0, 0)
+ cache.put(key_a, b"\x01" * 32)
+ cache.put(key_b, b"\x02" * 32)
+
+ # Touch A so B becomes the LRU victim.
+ assert cache.get(key_a) == b"\x01" * 32
+ cache.put(key_c, b"\x03" * 32)
+
+ assert cache.get(key_a) == b"\x01" * 32
+ assert cache.get(key_b) is None
+ assert cache.get(key_c) == b"\x03" * 32
+
+ def test_put_promotes_existing_entry(self):
+ cache = _BinaryDigestCache(maxsize=2)
+ key_a = ("/a", 0, 0, 0, 0)
+ key_b = ("/b", 0, 0, 0, 0)
+ key_c = ("/c", 0, 0, 0, 0)
+ cache.put(key_a, b"\x01" * 32)
+ cache.put(key_b, b"\x02" * 32)
+
+ # Re-putting A should refresh it so B is the next victim.
+ cache.put(key_a, b"\x01" * 32)
+ cache.put(key_c, b"\x03" * 32)
+
+ assert cache.get(key_a) == b"\x01" * 32
+ assert cache.get(key_b) is None
+ assert cache.get(key_c) == b"\x03" * 32
+
+ def test_clear_drops_all_entries(self):
+ cache = _BinaryDigestCache(maxsize=4)
+ key = ("/p", 0, 0, 0, 0)
+ cache.put(key, b"\xaa" * 32)
+ cache.clear()
+ assert cache.get(key) is None
+
+
+class TestBundleFind:
+ def test_finds_matching_dag_id(self, tmp_path):
+ binary = _build_bundle(tmp_path / "my_bundle",
dag_ids=["tutorial_dag", "other_dag"])
+
+ bundle = _Bundle.find([tmp_path], "tutorial_dag")
+ assert bundle.path == binary.resolve()
+
+ def test_picks_matching_bundle_among_many(self, tmp_path):
+ _build_bundle(tmp_path / "alpha", dag_ids=["alpha_dag"])
+ beta = _build_bundle(tmp_path / "beta", dag_ids=["beta_dag"])
+ _build_bundle(tmp_path / "gamma", dag_ids=["gamma_dag"])
+
+ bundle = _Bundle.find([tmp_path], "beta_dag")
+ assert bundle.path == beta.resolve()
+
+ def test_searches_nested_subdirectories(self, tmp_path):
+ nested = tmp_path / "team-a" / "release-2026.05"
+ nested.mkdir(parents=True)
+ target = _build_bundle(nested / "pipeline", dag_ids=["nested_dag"])
+
+ bundle = _Bundle.find([tmp_path], "nested_dag")
+ assert bundle.path == target.resolve()
+
+ def test_searches_multiple_roots(self, tmp_path):
+ root_a = tmp_path / "a"
+ root_b = tmp_path / "b"
+ root_a.mkdir()
+ root_b.mkdir()
+ _build_bundle(root_a / "alpha", dag_ids=["alpha_dag"])
+ target = _build_bundle(root_b / "beta", dag_ids=["beta_dag"])
+
+ bundle = _Bundle.find([root_a, root_b], "beta_dag")
+ assert bundle.path == target.resolve()
+
+ def test_skips_non_bundle_files(self, tmp_path):
+ (tmp_path / "README.md").write_text("not a bundle")
+ _make_executable(tmp_path / "stray_executable")
+ binary = _build_bundle(tmp_path / "real_bundle",
dag_ids=["tutorial_dag"])
+
+ bundle = _Bundle.find([tmp_path], "tutorial_dag")
+ assert bundle.path == binary.resolve()
+
+ def test_skips_non_executable_files(self, tmp_path):
+ non_exec = _build_bundle(tmp_path / "non_exec",
dag_ids=["tutorial_dag"])
+ non_exec.chmod(non_exec.stat().st_mode & ~(stat.S_IEXEC | stat.S_IXGRP
| stat.S_IXOTH))
+
+ with pytest.raises(FileNotFoundError, match="cannot find executable
bundle"):
+ _Bundle.find([tmp_path], "tutorial_dag")
+
+ def test_raises_when_not_found(self, tmp_path):
+ with pytest.raises(FileNotFoundError, match="cannot find executable
bundle"):
+ _Bundle.find([tmp_path], "nonexistent_dag")
+
+ def test_raises_when_directory_missing(self, tmp_path):
+ with pytest.raises(FileNotFoundError, match="cannot find executable
bundle"):
+ _Bundle.find([tmp_path / "does_not_exist"], "tutorial_dag")
+
+ def test_symlink_cycle_does_not_infinite_recurse(self, tmp_path):
+ nested = tmp_path / "inner"
+ nested.mkdir()
+ target = _build_bundle(nested / "pipeline", dag_ids=["loop_dag"])
+ loop = nested / "loop"
+ try:
+ loop.symlink_to(tmp_path)
+ except (OSError, NotImplementedError):
+ pytest.skip("symlinks not supported on this platform")
+
+ bundle = _Bundle.find([tmp_path], "loop_dag")
+ assert bundle.path == target.resolve()
+
+ def test_skips_bundle_with_corrupted_binary_region(self, tmp_path):
+ bundle_path = _build_bundle(tmp_path / "tampered",
dag_ids=["tutorial_dag"])
+ # Flip a byte in the binary region; the embedded SHA-256 no longer
matches.
+ data = bytearray(bundle_path.read_bytes())
+ data[0] ^= 0xFF
+ bundle_path.write_bytes(bytes(data))
+ bundle_path.chmod(bundle_path.stat().st_mode | stat.S_IEXEC |
stat.S_IXGRP | stat.S_IXOTH)
+ _digest_cache.clear()
+
+ with patch("airflow.sdk.coordinators.executable.coordinator.log") as
mock_log:
+ with pytest.raises(FileNotFoundError, match="cannot find
executable bundle"):
+ _Bundle.find([tmp_path], "tutorial_dag")
+
+ mock_log.debug.assert_any_call(
+ "Bundle binary_sha256 mismatch; skipping",
+ path=str(bundle_path),
+ expected=mock.ANY,
+ actual=mock.ANY,
+ )
+
+ def test_captures_schema_version_from_metadata(self, tmp_path):
+ _build_bundle(tmp_path / "with_schema", dag_ids=["tutorial_dag"])
+
+ bundle = _Bundle.find([tmp_path], "tutorial_dag")
+ assert bundle.schema_version == "2026-06-16"
+
+ def test_skips_bundle_when_schema_version_missing(self, tmp_path):
+ metadata = _make_metadata(["tutorial_dag"])
+ del metadata["sdk"]["supervisor_schema_version"]
+ bundle_path = _build_bundle(tmp_path / "no_schema",
dag_ids=["tutorial_dag"], metadata=metadata)
+
+ # Converter rejects the missing schema_version, so find() treats the
bundle as unusable.
+ with patch("airflow.sdk.coordinators.executable.coordinator.log") as
mock_log:
+ with pytest.raises(FileNotFoundError, match="matching bundles were
rejected"):
+ _Bundle.find([tmp_path], "tutorial_dag")
+
+ mock_log.debug.assert_any_call(
+ "Bundle metadata rejected; skipping",
+ path=str(bundle_path),
+ error=mock.ANY,
+ )
+
+ def test_skips_bundle_with_unknown_schema_version(self, tmp_path):
+ metadata = _make_metadata(["tutorial_dag"])
+ metadata["sdk"]["supervisor_schema_version"] = "1999-01-01"
+ bundle_path = _build_bundle(tmp_path / "bogus_schema",
dag_ids=["tutorial_dag"], metadata=metadata)
+
+ with patch("airflow.sdk.coordinators.executable.coordinator.log") as
mock_log:
+ with pytest.raises(FileNotFoundError, match="matching bundles were
rejected") as exc_info:
+ _Bundle.find([tmp_path], "tutorial_dag")
+
+ mock_log.debug.assert_any_call(
+ "Bundle metadata rejected; skipping",
+ path=str(bundle_path),
+ error=mock.ANY,
+ )
+ # The raised error surfaces the rejection so a found-but-unusable
bundle
+ # is not reported as missing.
+ msg = str(exc_info.value)
+ assert str(bundle_path.resolve()) in msg
+ assert "1999-01-01" in msg
+
+ def test_logs_when_metadata_yaml_is_malformed(self, tmp_path):
+ bundle_path = _build_bundle(
+ tmp_path / "bad_yaml",
+ dag_ids=["tutorial_dag"],
+ metadata=b"key: : not: valid: yaml: [",
+ )
+
+ with patch("airflow.sdk.coordinators.executable.coordinator.log") as
mock_log:
+ with pytest.raises(FileNotFoundError, match="cannot find
executable bundle"):
+ _Bundle.find([tmp_path], "tutorial_dag")
+
+ mock_log.debug.assert_any_call(
+ "Cannot decode bundle metadata; skipping",
+ path=str(bundle_path),
+ error=mock.ANY,
+ )
+
+ def test_logs_when_metadata_is_not_a_mapping(self, tmp_path):
+ bundle_path = _build_bundle(
+ tmp_path / "scalar_meta",
+ dag_ids=["tutorial_dag"],
+ metadata=b"just-a-scalar\n",
+ )
+
+ with patch("airflow.sdk.coordinators.executable.coordinator.log") as
mock_log:
+ with pytest.raises(FileNotFoundError, match="cannot find
executable bundle"):
+ _Bundle.find([tmp_path], "tutorial_dag")
+
+ mock_log.debug.assert_any_call(
+ "Bundle metadata is not a mapping; skipping",
+ path=str(bundle_path),
+ type=mock.ANY,
+ )
+
+
+class TestExecutableCoordinatorAttributes:
+ def test_executables_root_accepts_single_path(self, tmp_path):
+ coordinator = ExecutableCoordinator(executables_root=str(tmp_path))
+ assert coordinator.executables_root == [tmp_path]
+
+ def test_executables_root_accepts_list(self, tmp_path):
+ other = tmp_path / "other"
+ coordinator = ExecutableCoordinator(executables_root=[str(tmp_path),
other])
+ assert coordinator.executables_root == [tmp_path, other]
+
+ def test_executables_root_required(self):
+ with pytest.raises(TypeError, match="executables_root"):
+ ExecutableCoordinator()
+
+ def test_executables_root_must_be_non_empty(self):
+ with pytest.raises(ValueError, match="executables_root"):
+ ExecutableCoordinator(executables_root=None)
+
+
+class TestBuildExecuteTaskCommand:
+ def test_returns_resolved_executable_and_schema_version(self, tmp_path):
+ binary = _build_bundle(tmp_path / "my_bundle",
dag_ids=["tutorial_dag"])
+ ti = _make_ti(dag_id="tutorial_dag")
+
+ coordinator = ExecutableCoordinator(executables_root=[tmp_path])
+ command, schema_version =
coordinator._build_execute_task_command(what=ti)
+ assert command == [str(binary.resolve())]
+ assert schema_version == "2026-06-16"
+
+ def test_raises_when_bundle_omits_schema_version(self, tmp_path):
+ metadata = _make_metadata(["tutorial_dag"])
+ del metadata["sdk"]["supervisor_schema_version"]
+ _build_bundle(tmp_path / "my_bundle", dag_ids=["tutorial_dag"],
metadata=metadata)
+ ti = _make_ti(dag_id="tutorial_dag")
+
+ coordinator = ExecutableCoordinator(executables_root=[tmp_path])
+ with pytest.raises(FileNotFoundError, match="matching bundles were
rejected"):
+ coordinator._build_execute_task_command(what=ti)
+
+ def test_raises_when_dag_id_not_found(self, tmp_path):
+ _build_bundle(tmp_path / "my_bundle", dag_ids=["other_dag"])
+ ti = _make_ti(dag_id="tutorial_dag")
+
+ coordinator = ExecutableCoordinator(executables_root=[tmp_path])
+ with pytest.raises(FileNotFoundError, match="cannot find executable
bundle"):
+ coordinator._build_execute_task_command(what=ti)
+
+
[email protected]
+def bundles_dir(tmp_path):
+ _build_bundle(tmp_path / "my_bundle", dag_ids=["tutorial_dag"])
+ return tmp_path
+
+
[email protected]
+def mock_client(make_ti_context):
+ client = MagicMock()
+ client.task_instances.start.return_value = make_ti_context()
+ return client
+
+
+class TestExecutableCoordinatorExecuteTask:
+ def _captured_popen_cmd(self, bundles_dir: pathlib.Path, mock_client) ->
list[str]:
+ """Run execute_task with mocked subprocess and return the command
list."""
+ ti = _make_ti(dag_id="tutorial_dag")
+ coordinator = ExecutableCoordinator(executables_root=[bundles_dir])
+
+ mock_proc = MagicMock(spec=subprocess.Popen)
+ mock_proc.pid = 12345
+ comm_sock = MagicMock(spec=socket.socket)
+ logs_sock = MagicMock(spec=socket.socket)
+ popen_calls: list = []
+
+ def capture_popen(cmd, **kwargs):
+ popen_calls.append(cmd)
+ return mock_proc
+
+ with (
+ patch(
+ "airflow.sdk.coordinators._subprocess.subprocess.Popen",
+ side_effect=capture_popen,
+ ),
+ patch(
+ "airflow.sdk.coordinators._subprocess._accept_connections",
+ side_effect=lambda servers, drains, proc, **kw: (
+ {servers["comm"]: comm_sock, servers["logs"]: logs_sock},
+ {soc: b"" for soc in drains.values()},
+ ),
+ ),
+ patch.object(ActivitySubprocess, "_register_pipe_readers"),
+ patch.object(ActivitySubprocess, "_on_child_started"),
+ patch.object(ActivitySubprocess, "wait", return_value=0),
+ patch("psutil.Process"),
+ ):
+ coordinator.execute_task(
+ what=ti,
+ dag_rel_path="my_bundle",
+ bundle_info=MagicMock(),
+ client=mock_client,
+ subprocess_logs_to_stdout=False,
+ )
+
+ assert popen_calls, "subprocess.Popen was not called"
+ return popen_calls[0]
+
+ def test_executable_path_is_first_arg(self, bundles_dir, mock_client):
+ cmd = self._captured_popen_cmd(bundles_dir, mock_client)
+ expected = str((bundles_dir / "my_bundle").resolve())
+ assert cmd[0] == expected
+
+ def test_returns_execution_result(self, bundles_dir, mock_client):
+ ti = _make_ti(dag_id="tutorial_dag")
+ coordinator = ExecutableCoordinator(executables_root=[bundles_dir])
+
+ mock_proc = MagicMock(spec=subprocess.Popen)
+ mock_proc.pid = 99999
+ comm_sock = MagicMock(spec=socket.socket)
+ logs_sock = MagicMock(spec=socket.socket)
+
+ with (
+ patch("subprocess.Popen", return_value=mock_proc),
+ patch(
+ "airflow.sdk.coordinators._subprocess._accept_connections",
+ side_effect=lambda servers, drains, proc, **kw: (
+ {servers["comm"]: comm_sock, servers["logs"]: logs_sock},
+ {soc: b"" for soc in drains.values()},
+ ),
+ ),
+ patch.object(ActivitySubprocess, "_register_pipe_readers"),
+ patch.object(ActivitySubprocess, "_on_child_started"),
+ patch.object(ActivitySubprocess, "wait", return_value=0),
+ patch("psutil.Process"),
+ ):
+ result = coordinator.execute_task(
+ what=ti,
+ dag_rel_path="my_bundle",
+ bundle_info=MagicMock(),
+ client=mock_client,
+ subprocess_logs_to_stdout=False,
+ )
+
+ assert isinstance(result, BaseCoordinator.ExecutionResult)
+ assert result.exit_code == 0