1fanwang commented on code in PR #68480:
URL: https://github.com/apache/airflow/pull/68480#discussion_r3438015729


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -606,10 +641,93 @@ def run_next(self, next_job: KubernetesJob) -> None:
         )
         self.log.debug("Kubernetes running for command %s", command)
         self.log.debug("Kubernetes launching image %s", 
pod.spec.containers[0].image)
+        return pod
 
-        # the watcher will monitor pods, so we do not block.
-        self.run_pod_async(pod, **self.kube_config.kube_client_request_args)
-        self.log.debug("Kubernetes Job created!")
+    def run_next_batch(self, next_jobs: list[KubernetesJob]) -> 
list[tuple[KubernetesJob, Exception | None]]:
+        """
+        Build and create a batch of worker pods, parallelizing the create API 
calls.
+
+        Pod request objects are built synchronously (pod-mutation hook, 
reconciliation),
+        then the create calls are issued concurrently against the Kubernetes 
API using the
+        asynchronous client, bounded by ``pod_creation_max_concurrency``. 
Returns one
+        ``(job, exception)`` pair per job (``exception`` is ``None`` on 
success). Build and
+        create failures are returned rather than raised so the caller can 
apply the same
+        per-task handling as the sequential path.
+        """
+        built: list[tuple[KubernetesJob, k8s.V1Pod | None, Exception | None]] 
= []
+        for job in next_jobs:
+            try:
+                built.append((job, self._build_pod_request(job), None))
+            except Exception as e:
+                built.append((job, None, e))
+
+        to_create = [(job, pod) for job, pod, build_err in built if build_err 
is None and pod is not None]
+        create_errors: dict[int, Exception | None] = {}
+        if to_create:
+            create_errors = self._run_pods_async(to_create)
+
+        return [
+            (job, build_err if build_err is not None else 
create_errors.get(id(job)))
+            for job, _, build_err in built
+        ]
+
+    def _run_pods_async(
+        self, jobs_and_pods: list[tuple[KubernetesJob, k8s.V1Pod]]
+    ) -> dict[int, Exception | None]:
+        """Create the given pods concurrently on a dedicated event loop; 
return per-job error (or None)."""
+        if self._async_loop is None:
+            self._async_loop = asyncio.new_event_loop()
+        return 
self._async_loop.run_until_complete(self._create_pods_async(jobs_and_pods))
+
+    async def _create_pods_async(
+        self, jobs_and_pods: list[tuple[KubernetesJob, k8s.V1Pod]]
+    ) -> dict[int, Exception | None]:
+        """Issue create_namespaced_pod calls concurrently, bounded by a 
semaphore."""
+        if self._async_pod_client is None:
+            self._async_pod_client = await get_async_kube_client()
+        api = self._async_pod_client
+        semaphore = asyncio.Semaphore(self.pod_creation_max_concurrency)
+        request_kwargs: dict[str, Any] = 
self.kube_config.kube_client_request_args or {}
+
+        async def _create(pod: k8s.V1Pod) -> None:

Review Comment:
   Thanks for the idea!
   Actually I checked a bit further - The executor uses a different retry model 
than KPO — on a failed create it re-queues the task (task_publish_retries + 
create_pods_after) and retries next scheduler loop, rather than in-line. Both 
paths share that via _handle_pod_publish_error, so sync and async fail/retry 
identically. Wrapping the create in @generic_api_retry would stack an in-line 
tenacity retry on top, and since the concurrent batch runs via 
run_until_complete, its Retry-After sleeps would block the scheduler loop. 
   
   While checking this I can see a real gap that exists today - neither path 
retries 502/503/504 today, which generic_api_retry's transient set covers — 
happy to broaden both paths in a follow-up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to