This is an automated email from the ASF dual-hosted git repository.
jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4aafb9523be Correct the example config for the coordinators (#68929)
4aafb9523be is described below
commit 4aafb9523bee458f68ab6b19470b43bb8eb4730e
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Wed Jun 24 19:24:24 2026 +0900
Correct the example config for the coordinators (#68929)
* Correct the example and docstring for the coordinators
* Guard the config.yml coordinators example against drift
---
.../src/airflow/config_templates/config.yml | 7 +++++
.../sdk/coordinators/executable/coordinator.py | 7 ++---
.../airflow/sdk/coordinators/java/coordinator.py | 9 +++---
.../task_sdk/execution_time/test_coordinator.py | 32 +++++++++++++++++++++-
4 files changed, 45 insertions(+), 10 deletions(-)
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index 29763f422fe..39423755a68 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -2091,6 +2091,7 @@ sdk:
"jdk-17": {
"classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
"kwargs": {
+ "jars_root": ["/opt/airflow/java-bundles"],
"java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java",
"jvm_args": ["-Xmx1024m"]
},
@@ -2099,6 +2100,12 @@ sdk:
"worker_container_repository": "apache/airflow",
"worker_container_tag": "3.3.0"
}
+ },
+ "go-sdk": {
+ "classpath":
"airflow.sdk.coordinators.executable.ExecutableCoordinator",
+ "kwargs": {
+ "executables_root": ["/opt/airflow/executable-bundles"]
+ }
}
}
default: ~
diff --git a/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py
b/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py
index ed11f97165d..2275628d0b1 100644
--- a/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py
+++ b/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py
@@ -370,12 +370,11 @@ class ExecutableCoordinator(SubprocessCoordinator):
Configuration is taken from the ``[sdk] coordinators`` entry that
constructs
this instance::
- {
- "name": "go",
+ "go": {
"classpath":
"airflow.sdk.coordinators.executable.ExecutableCoordinator",
"kwargs": {
- "executables_root": ["~/airflow/executable-bundles"],
- },
+ "executables_root": ["~/airflow/executable-bundles"]
+ }
}
:param executables_root: A list of directories scanned for executable
diff --git a/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py
b/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py
index cb74c64dfd4..da35d0f37b8 100644
--- a/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py
+++ b/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py
@@ -174,14 +174,13 @@ class JavaCoordinator(SubprocessCoordinator):
Configuration is taken from the ``[sdk] coordinators`` entry that
constructs
this instance::
- {
- "name": "jdk-17",
+ "jdk-17": {
"classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
"kwargs": {
- "java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java",
- "jvm_args": ["-Xmx1024m"],
"jars_root": ["~/airflow/jars"],
- },
+ "java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java",
+ "jvm_args": ["-Xmx1024m"]
+ }
}
:param java_executable: Path to the ``java`` command (defaults to
diff --git a/task-sdk/tests/task_sdk/execution_time/test_coordinator.py
b/task-sdk/tests/task_sdk/execution_time/test_coordinator.py
index 475b587bf7f..40db5378afc 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_coordinator.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_coordinator.py
@@ -22,7 +22,8 @@ import json
import pytest
-from airflow.sdk.configuration import conf
+from airflow.sdk._shared.module_loading import import_string
+from airflow.sdk.configuration import conf, retrieve_configuration_description
from airflow.sdk.execution_time.coordinator import (
BaseCoordinator,
CoordinatorManager,
@@ -199,3 +200,32 @@ class TestCoordinatorManager:
"pod_template_file": "/opt/airflow/pod_templates/boom.yaml"
}
assert manager._created_coordinators == {}
+
+
+class TestConfigYamlCoordinatorsExample:
+ """Guard the ``[sdk] coordinators`` example in ``config.yml`` against
drift.
+
+ Nothing else exercises the example, so a broken one (e.g. dropping the
+ required ``jars_root`` kwarg) can ship unnoticed. Loading it through
+ CoordinatorManager and constructing every entry keeps the example honest.
+ """
+
+ def test_every_example_coordinator_constructs(self, sdk_config):
+ description = retrieve_configuration_description()
+ coordinators_example =
description["sdk"]["options"]["coordinators"]["example"]
+ specs = json.loads(coordinators_example)
+ assert specs, "config.yml [sdk] coordinators example must not be empty"
+
+ # The example's own queue_to_coordinator illustrates different keys, so
+ # route every coordinator through a synthetic queue to construct each
one.
+ queue_to_coordinator = {f"queue-{key}": key for key in specs}
+ sdk_config(
+ coordinators=coordinators_example,
+ queue_to_coordinator=json.dumps(queue_to_coordinator),
+ )
+ manager = CoordinatorManager.from_config()
+ assert set(manager._coordinator_specs) == set(specs)
+
+ for queue, key in queue_to_coordinator.items():
+ coordinator = manager.for_queue(queue)
+ assert isinstance(coordinator,
import_string(specs[key]["classpath"]))