1fanwang commented on code in PR #68480:
URL: https://github.com/apache/airflow/pull/68480#discussion_r3438015729
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -606,10 +641,93 @@ def run_next(self, next_job: KubernetesJob) -> None:
)
self.log.debug("Kubernetes running for command %s", command)
self.log.debug("Kubernetes launching image %s",
pod.spec.containers[0].image)
+ return pod
- # the watcher will monitor pods, so we do not block.
- self.run_pod_async(pod, **self.kube_config.kube_client_request_args)
- self.log.debug("Kubernetes Job created!")
+ def run_next_batch(self, next_jobs: list[KubernetesJob]) ->
list[tuple[KubernetesJob, Exception | None]]:
+ """
+ Build and create a batch of worker pods, parallelizing the create API
calls.
+
+ Pod request objects are built synchronously (pod-mutation hook,
reconciliation),
+ then the create calls are issued concurrently against the Kubernetes
API using the
+ asynchronous client, bounded by ``pod_creation_max_concurrency``.
Returns one
+ ``(job, exception)`` pair per job (``exception`` is ``None`` on
success). Build and
+ create failures are returned rather than raised so the caller can
apply the same
+ per-task handling as the sequential path.
+ """
+ built: list[tuple[KubernetesJob, k8s.V1Pod | None, Exception | None]]
= []
+ for job in next_jobs:
+ try:
+ built.append((job, self._build_pod_request(job), None))
+ except Exception as e:
+ built.append((job, None, e))
+
+ to_create = [(job, pod) for job, pod, build_err in built if build_err
is None and pod is not None]
+ create_errors: dict[int, Exception | None] = {}
+ if to_create:
+ create_errors = self._run_pods_async(to_create)
+
+ return [
+ (job, build_err if build_err is not None else
create_errors.get(id(job)))
+ for job, _, build_err in built
+ ]
+
+ def _run_pods_async(
+ self, jobs_and_pods: list[tuple[KubernetesJob, k8s.V1Pod]]
+ ) -> dict[int, Exception | None]:
+ """Create the given pods concurrently on a dedicated event loop;
return per-job error (or None)."""
+ if self._async_loop is None:
+ self._async_loop = asyncio.new_event_loop()
+ return
self._async_loop.run_until_complete(self._create_pods_async(jobs_and_pods))
+
+ async def _create_pods_async(
+ self, jobs_and_pods: list[tuple[KubernetesJob, k8s.V1Pod]]
+ ) -> dict[int, Exception | None]:
+ """Issue create_namespaced_pod calls concurrently, bounded by a
semaphore."""
+ if self._async_pod_client is None:
+ self._async_pod_client = await get_async_kube_client()
+ api = self._async_pod_client
+ semaphore = asyncio.Semaphore(self.pod_creation_max_concurrency)
+ request_kwargs: dict[str, Any] =
self.kube_config.kube_client_request_args or {}
+
+ async def _create(pod: k8s.V1Pod) -> None:
Review Comment:
Thanks for the idea!
Actually I checked a bit further - The executor uses a different retry model
than KPO — on a failed create it re-queues the task (task_publish_retries +
create_pods_after) and retries next scheduler loop, rather than in-line. Both
paths share that via _handle_pod_publish_error, so sync and async fail/retry
identically. Wrapping the create in @generic_api_retry would stack an in-line
tenacity retry on top, and since the concurrent batch runs via
run_until_complete, its Retry-After sleeps would block the scheduler loop.
While checking this I can see a real gap that exists today - neither path
retries 502/503/504 today, which generic_api_retry's transient set covers —
happy to broaden both paths in a follow-up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]