phanikumv commented on code in PR #68694:
URL: https://github.com/apache/airflow/pull/68694#discussion_r3433737991


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -199,6 +199,32 @@ def start(self) -> None:
             scheduler_job_id=self.scheduler_job_id,
         )
 
+    @staticmethod
+    def _coordinator_pod_template_file(queue: str | None) -> str | None:
+        """
+        Return the pod template a coordinator declares for *queue*, if any.
+
+        Lets a queue routed to a non-Python coordinator (via ``[sdk]
+        queue_to_coordinator``) launch its worker pod from a 
coordinator-specific
+        template — for example an image carrying the JVM for a Java 
coordinator.
+        The template is read from the coordinator's ``extra`` config mapping,
+        resolved without importing or instantiating the coordinator itself.
+
+        The coordinator manager is only available on Airflow 3.3+; on older 
Task
+        SDKs the import fails and we fall back to no coordinator template.
+        """
+        if not queue:
+            return None
+        try:
+            from airflow.sdk.execution_time.coordinator import 
get_coordinator_manager
+        except ImportError:
+            return None
+
+        if (extra := get_coordinator_manager().extra_for_queue(queue)) is not 
None:
+            return extra.get("pod_template_file", None)
+
+        return None

Review Comment:
     ```suggestion
             if not queue:
                 return None
   
             from airflow.exceptions import AirflowConfigException
   
             try:
                 from airflow.sdk.execution_time.coordinator import 
get_coordinator_manager
   
                 extra = get_coordinator_manager().extra_for_queue(queue)
             except ImportError:
                 # Pre-3.3 Task SDKs have no coordinator manager.
                 return None
             except (AirflowConfigException, ValueError):
                 # A malformed [sdk] coordinators / queue_to_coordinator config 
must not
                 # crash the executor for unrelated queues; fall back to no 
coordinator template.
                 logging.getLogger(__name__).warning(
                     "Ignoring coordinator pod template for queue %s: invalid 
[sdk] coordinator config",
                     queue,
                     exc_info=True,
                 )
                 return None
   
             if extra is not None:
                 return extra.get("pod_template_file", None)
   
             return None
     ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to