This is an automated email from the ASF dual-hosted git repository.

jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4aafb9523be Correct the example config for the coordinators (#68929)
4aafb9523be is described below

commit 4aafb9523bee458f68ab6b19470b43bb8eb4730e
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Wed Jun 24 19:24:24 2026 +0900

    Correct the example config for the coordinators (#68929)
    
    * Correct the example and docstring for the coordinators
    
    * Guard the config.yml coordinators example against drift
---
 .../src/airflow/config_templates/config.yml        |  7 +++++
 .../sdk/coordinators/executable/coordinator.py     |  7 ++---
 .../airflow/sdk/coordinators/java/coordinator.py   |  9 +++---
 .../task_sdk/execution_time/test_coordinator.py    | 32 +++++++++++++++++++++-
 4 files changed, 45 insertions(+), 10 deletions(-)

diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index 29763f422fe..39423755a68 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -2091,6 +2091,7 @@ sdk:
           "jdk-17": {
             "classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
             "kwargs": {
+              "jars_root": ["/opt/airflow/java-bundles"],
               "java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java",
               "jvm_args": ["-Xmx1024m"]
             },
@@ -2099,6 +2100,12 @@ sdk:
               "worker_container_repository": "apache/airflow",
               "worker_container_tag": "3.3.0"
             }
+          },
+          "go-sdk": {
+            "classpath": 
"airflow.sdk.coordinators.executable.ExecutableCoordinator",
+            "kwargs": {
+              "executables_root": ["/opt/airflow/executable-bundles"]
+            }
           }
         }
       default: ~
diff --git a/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py 
b/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py
index ed11f97165d..2275628d0b1 100644
--- a/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py
+++ b/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py
@@ -370,12 +370,11 @@ class ExecutableCoordinator(SubprocessCoordinator):
     Configuration is taken from the ``[sdk] coordinators`` entry that 
constructs
     this instance::
 
-        {
-            "name": "go",
+        "go": {
             "classpath": 
"airflow.sdk.coordinators.executable.ExecutableCoordinator",
             "kwargs": {
-                "executables_root": ["~/airflow/executable-bundles"],
-            },
+                "executables_root": ["~/airflow/executable-bundles"]
+            }
         }
 
     :param executables_root: A list of directories scanned for executable
diff --git a/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py 
b/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py
index cb74c64dfd4..da35d0f37b8 100644
--- a/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py
+++ b/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py
@@ -174,14 +174,13 @@ class JavaCoordinator(SubprocessCoordinator):
     Configuration is taken from the ``[sdk] coordinators`` entry that 
constructs
     this instance::
 
-        {
-            "name": "jdk-17",
+        "jdk-17": {
             "classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
             "kwargs": {
-                "java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java",
-                "jvm_args": ["-Xmx1024m"],
                 "jars_root": ["~/airflow/jars"],
-            },
+                "java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java",
+                "jvm_args": ["-Xmx1024m"]
+            }
         }
 
     :param java_executable: Path to the ``java`` command (defaults to
diff --git a/task-sdk/tests/task_sdk/execution_time/test_coordinator.py 
b/task-sdk/tests/task_sdk/execution_time/test_coordinator.py
index 475b587bf7f..40db5378afc 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_coordinator.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_coordinator.py
@@ -22,7 +22,8 @@ import json
 
 import pytest
 
-from airflow.sdk.configuration import conf
+from airflow.sdk._shared.module_loading import import_string
+from airflow.sdk.configuration import conf, retrieve_configuration_description
 from airflow.sdk.execution_time.coordinator import (
     BaseCoordinator,
     CoordinatorManager,
@@ -199,3 +200,32 @@ class TestCoordinatorManager:
             "pod_template_file": "/opt/airflow/pod_templates/boom.yaml"
         }
         assert manager._created_coordinators == {}
+
+
+class TestConfigYamlCoordinatorsExample:
+    """Guard the ``[sdk] coordinators`` example in ``config.yml`` against 
drift.
+
+    Nothing else exercises the example, so a broken one (e.g. dropping the
+    required ``jars_root`` kwarg) can ship unnoticed. Loading it through
+    CoordinatorManager and constructing every entry keeps the example honest.
+    """
+
+    def test_every_example_coordinator_constructs(self, sdk_config):
+        description = retrieve_configuration_description()
+        coordinators_example = 
description["sdk"]["options"]["coordinators"]["example"]
+        specs = json.loads(coordinators_example)
+        assert specs, "config.yml [sdk] coordinators example must not be empty"
+
+        # The example's own queue_to_coordinator illustrates different keys, so
+        # route every coordinator through a synthetic queue to construct each 
one.
+        queue_to_coordinator = {f"queue-{key}": key for key in specs}
+        sdk_config(
+            coordinators=coordinators_example,
+            queue_to_coordinator=json.dumps(queue_to_coordinator),
+        )
+        manager = CoordinatorManager.from_config()
+        assert set(manager._coordinator_specs) == set(specs)
+
+        for queue, key in queue_to_coordinator.items():
+            coordinator = manager.for_queue(queue)
+            assert isinstance(coordinator, 
import_string(specs[key]["classpath"]))

Reply via email to