jscheffl commented on code in PR #68480:
URL: https://github.com/apache/airflow/pull/68480#discussion_r3409284075


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -606,10 +641,93 @@ def run_next(self, next_job: KubernetesJob) -> None:
         )
         self.log.debug("Kubernetes running for command %s", command)
         self.log.debug("Kubernetes launching image %s", 
pod.spec.containers[0].image)
+        return pod
 
-        # the watcher will monitor pods, so we do not block.
-        self.run_pod_async(pod, **self.kube_config.kube_client_request_args)
-        self.log.debug("Kubernetes Job created!")
+    def run_next_batch(self, next_jobs: list[KubernetesJob]) -> 
list[tuple[KubernetesJob, Exception | None]]:
+        """
+        Build and create a batch of worker pods, parallelizing the create API 
calls.
+
+        Pod request objects are built synchronously (pod-mutation hook, 
reconciliation),
+        then the create calls are issued concurrently against the Kubernetes 
API using the
+        asynchronous client, bounded by ``pod_creation_max_concurrency``. 
Returns one
+        ``(job, exception)`` pair per job (``exception`` is ``None`` on 
success). Build and
+        create failures are returned rather than raised so the caller can 
apply the same
+        per-task handling as the sequential path.
+        """
+        built: list[tuple[KubernetesJob, k8s.V1Pod | None, Exception | None]] 
= []
+        for job in next_jobs:
+            try:
+                built.append((job, self._build_pod_request(job), None))
+            except Exception as e:
+                built.append((job, None, e))
+
+        to_create = [(job, pod) for job, pod, build_err in built if build_err 
is None and pod is not None]
+        create_errors: dict[int, Exception | None] = {}
+        if to_create:
+            create_errors = self._run_pods_async(to_create)
+
+        return [
+            (job, build_err if build_err is not None else 
create_errors.get(id(job)))
+            for job, _, build_err in built
+        ]
+
+    def _run_pods_async(
+        self, jobs_and_pods: list[tuple[KubernetesJob, k8s.V1Pod]]
+    ) -> dict[int, Exception | None]:
+        """Create the given pods concurrently on a dedicated event loop; 
return per-job error (or None)."""
+        if self._async_loop is None:
+            self._async_loop = asyncio.new_event_loop()
+        return 
self._async_loop.run_until_complete(self._create_pods_async(jobs_and_pods))
+
+    async def _create_pods_async(
+        self, jobs_and_pods: list[tuple[KubernetesJob, k8s.V1Pod]]
+    ) -> dict[int, Exception | None]:
+        """Issue create_namespaced_pod calls concurrently, bounded by a 
semaphore."""
+        if self._async_pod_client is None:
+            self._async_pod_client = await get_async_kube_client()
+        api = self._async_pod_client
+        semaphore = asyncio.Semaphore(self.pod_creation_max_concurrency)
+        request_kwargs: dict[str, Any] = 
self.kube_config.kube_client_request_args or {}
+
+        async def _create(pod: k8s.V1Pod) -> None:

Review Comment:
   For improving and hardening under load a lot changes were made in KPO via 
`@generic_api_retry`, would it make sense to apply the decorator here as well?



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -311,92 +311,170 @@ def sync(self) -> None:
                 last_resource_version[ns] or 
resource_instance.resource_version[ns]
             )
 
-        from kubernetes.client.rest import ApiException
-
         if self.create_pods_after and self.create_pods_after > datetime.now():
             self.log.warning("Skipping pod creation due to kubernetes rate 
limit")
             return
 
         self.create_pods_after = None
 
+        if self.kube_config.async_pod_creation:
+            self._create_pods_concurrently()
+        else:
+            self._create_pods_sequentially()
+
+    def _create_pods_sequentially(self) -> None:
+        """Dequeue a batch and create worker pods one at a time (default 
behavior)."""
+        from kubernetes.client.rest import ApiException
+
+        if TYPE_CHECKING:
+            assert self.kube_scheduler
+        created = 0
+        start = time.monotonic()
         with contextlib.suppress(Empty):
             for _ in range(self.kube_config.worker_pods_creation_batch_size):
                 task = self.task_queue.get_nowait()
-
+                created += 1
                 try:
-                    key = task.key
                     self.kube_scheduler.run_next(task)

Review Comment:
   I am not a K8sExecutor expert but this deletion looks "strange" or not 
proper. Are you sure you did not delete by accident? Looks like all the syn Pod 
creation code is gone.



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py:
##########
@@ -776,6 +776,323 @@ def test_run_next_pod_reconciliation_error(
             finally:
                 kubernetes_executor.end()
 
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason="kubernetes python package 
is not installed"
+    )
+    
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+    def test_kube_config_parses_async_pod_creation_options(self, 
mock_get_kube_client):
+        """The two new config keys are parsed onto KubeConfig."""
+        with conf_vars(
+            {
+                ("kubernetes_executor", "async_pod_creation"): "True",
+                ("kubernetes_executor", "pod_creation_max_concurrency"): "7",
+            }
+        ):
+            executor = KubernetesExecutor()
+            assert executor.kube_config.async_pod_creation is True
+            assert executor.kube_config.pod_creation_max_concurrency == 7

Review Comment:
   I think the value is very low having a test that checks if config properties 
are read. This is tested implicitly by other tests below.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kube_client.py:
##########
@@ -154,3 +155,48 @@ def get_kube_client(
 
     api_client = client.ApiClient(configuration=configuration)
     return client.CoreV1Api(api_client)
+
+
+async def get_async_kube_client(

Review Comment:
   In 
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
 there is already a API client created that uses also some explicit timeout 
wrapper. Can you please leverage this codebase w/o creating a new utility?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to