jason810496 commented on code in PR #68694:
URL: https://github.com/apache/airflow/pull/68694#discussion_r3435947584
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -199,6 +199,32 @@ def start(self) -> None:
scheduler_job_id=self.scheduler_job_id,
)
+ @staticmethod
+ def _coordinator_pod_template_file(queue: str | None) -> str | None:
+ """
+ Return the pod template a coordinator declares for *queue*, if any.
+
+ Lets a queue routed to a non-Python coordinator (via ``[sdk]
+ queue_to_coordinator``) launch its worker pod from a
coordinator-specific
+ template — for example an image carrying the JVM for a Java
coordinator.
+ The template is read from the coordinator's ``extra`` config mapping,
+ resolved without importing or instantiating the coordinator itself.
+
+ The coordinator manager is only available on Airflow 3.3+; on older
Task
+ SDKs the import fails and we fall back to no coordinator template.
+ """
+ if not queue:
+ return None
+ try:
+ from airflow.sdk.execution_time.coordinator import
get_coordinator_manager
+ except ImportError:
+ return None
+
+ if (extra := get_coordinator_manager().extra_for_queue(queue)) is not
None:
+ return extra.get("pod_template_file", None)
+
+ return None
Review Comment:
Nice catch. The k8s executor part is separated to
https://github.com/apache/airflow/pull/68713, and I addressed the potential
exception there as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]