jscheffl commented on code in PR #68480:
URL: https://github.com/apache/airflow/pull/68480#discussion_r3409284075
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -606,10 +641,93 @@ def run_next(self, next_job: KubernetesJob) -> None:
)
self.log.debug("Kubernetes running for command %s", command)
self.log.debug("Kubernetes launching image %s",
pod.spec.containers[0].image)
+ return pod
- # the watcher will monitor pods, so we do not block.
- self.run_pod_async(pod, **self.kube_config.kube_client_request_args)
- self.log.debug("Kubernetes Job created!")
+ def run_next_batch(self, next_jobs: list[KubernetesJob]) ->
list[tuple[KubernetesJob, Exception | None]]:
+ """
+ Build and create a batch of worker pods, parallelizing the create API
calls.
+
+ Pod request objects are built synchronously (pod-mutation hook,
reconciliation),
+ then the create calls are issued concurrently against the Kubernetes
API using the
+ asynchronous client, bounded by ``pod_creation_max_concurrency``.
Returns one
+ ``(job, exception)`` pair per job (``exception`` is ``None`` on
success). Build and
+ create failures are returned rather than raised so the caller can
apply the same
+ per-task handling as the sequential path.
+ """
+ built: list[tuple[KubernetesJob, k8s.V1Pod | None, Exception | None]]
= []
+ for job in next_jobs:
+ try:
+ built.append((job, self._build_pod_request(job), None))
+ except Exception as e:
+ built.append((job, None, e))
+
+ to_create = [(job, pod) for job, pod, build_err in built if build_err
is None and pod is not None]
+ create_errors: dict[int, Exception | None] = {}
+ if to_create:
+ create_errors = self._run_pods_async(to_create)
+
+ return [
+ (job, build_err if build_err is not None else
create_errors.get(id(job)))
+ for job, _, build_err in built
+ ]
+
+ def _run_pods_async(
+ self, jobs_and_pods: list[tuple[KubernetesJob, k8s.V1Pod]]
+ ) -> dict[int, Exception | None]:
+ """Create the given pods concurrently on a dedicated event loop;
return per-job error (or None)."""
+ if self._async_loop is None:
+ self._async_loop = asyncio.new_event_loop()
+ return
self._async_loop.run_until_complete(self._create_pods_async(jobs_and_pods))
+
+ async def _create_pods_async(
+ self, jobs_and_pods: list[tuple[KubernetesJob, k8s.V1Pod]]
+ ) -> dict[int, Exception | None]:
+ """Issue create_namespaced_pod calls concurrently, bounded by a
semaphore."""
+ if self._async_pod_client is None:
+ self._async_pod_client = await get_async_kube_client()
+ api = self._async_pod_client
+ semaphore = asyncio.Semaphore(self.pod_creation_max_concurrency)
+ request_kwargs: dict[str, Any] =
self.kube_config.kube_client_request_args or {}
+
+ async def _create(pod: k8s.V1Pod) -> None:
Review Comment:
For improving and hardening under load a lot changes were made in KPO via
`@generic_api_retry`, would it make sense to apply the decorator here as well?
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -311,92 +311,170 @@ def sync(self) -> None:
last_resource_version[ns] or
resource_instance.resource_version[ns]
)
- from kubernetes.client.rest import ApiException
-
if self.create_pods_after and self.create_pods_after > datetime.now():
self.log.warning("Skipping pod creation due to kubernetes rate
limit")
return
self.create_pods_after = None
+ if self.kube_config.async_pod_creation:
+ self._create_pods_concurrently()
+ else:
+ self._create_pods_sequentially()
+
+ def _create_pods_sequentially(self) -> None:
+ """Dequeue a batch and create worker pods one at a time (default
behavior)."""
+ from kubernetes.client.rest import ApiException
+
+ if TYPE_CHECKING:
+ assert self.kube_scheduler
+ created = 0
+ start = time.monotonic()
with contextlib.suppress(Empty):
for _ in range(self.kube_config.worker_pods_creation_batch_size):
task = self.task_queue.get_nowait()
-
+ created += 1
try:
- key = task.key
self.kube_scheduler.run_next(task)
Review Comment:
I am not a K8sExecutor expert but this deletion looks "strange" or not
proper. Are you sure you did not delete by accident? Looks like all the syn Pod
creation code is gone.
##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py:
##########
@@ -776,6 +776,323 @@ def test_run_next_pod_reconciliation_error(
finally:
kubernetes_executor.end()
+ @pytest.mark.skipif(
+ AirflowKubernetesScheduler is None, reason="kubernetes python package
is not installed"
+ )
+
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+ def test_kube_config_parses_async_pod_creation_options(self,
mock_get_kube_client):
+ """The two new config keys are parsed onto KubeConfig."""
+ with conf_vars(
+ {
+ ("kubernetes_executor", "async_pod_creation"): "True",
+ ("kubernetes_executor", "pod_creation_max_concurrency"): "7",
+ }
+ ):
+ executor = KubernetesExecutor()
+ assert executor.kube_config.async_pod_creation is True
+ assert executor.kube_config.pod_creation_max_concurrency == 7
Review Comment:
I think the value is very low having a test that checks if config properties
are read. This is tested implicitly by other tests below.
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kube_client.py:
##########
@@ -154,3 +155,48 @@ def get_kube_client(
api_client = client.ApiClient(configuration=configuration)
return client.CoreV1Api(api_client)
+
+
+async def get_async_kube_client(
Review Comment:
In
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
there is already a API client created that uses also some explicit timeout
wrapper. Can you please leverage this codebase w/o creating a new utility?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]