This is an automated email from the ASF dual-hosted git repository.
ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e7619316995 Fix KubernetesExecutor scheduler crash from unpicklable
pod_override (#68831)
e7619316995 is described below
commit e76193169953de0d4b7c24ebb617e4392d470a5f
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Jun 23 13:34:14 2026 +0100
Fix KubernetesExecutor scheduler crash from unpicklable pod_override
(#68831)
* Fix KubernetesExecutor scheduler crash from unpicklable pod_override
When the scheduler runs in-cluster, the kubernetes client installs a
process-global default Configuration whose refresh_api_key_hook is a local
closure. Under kubernetes-client v36, deserializing a V1Pod copies that
Configuration onto the pod and every nested model. A task that sets a V1Pod
pod_override therefore produces a pod that pickle cannot serialize, and the
KubernetesExecutor pickles each task onto a multiprocessing queue
synchronously in the scheduler loop, so the scheduler crashes in a loop and
no
task can be launched.
Deserialize pods through an ApiClient built with a fresh Configuration so
that
neither the pod nor any nested model captures the in-cluster global. This is
applied where the config gets attached -- airflow-core's serialization and
unpickling repair paths -- so every consumer of a deserialized pod stays
picklable.
* Address review feedback on pod deserialization fix
Move the deserialize_pod_dict import to module top-level, hoist pickle and
Configuration to top-level imports in the serialization test, and drop the
redundant _has_kubernetes cache_clear calls that were copied from a sibling
test.
---
.../airflow/serialization/serialized_objects.py | 5 ++-
airflow-core/src/airflow/utils/sqlalchemy.py | 26 +++++++++++++---
.../unit/serialization/test_serialized_objects.py | 36 +++++++++++++++++++++-
airflow-core/tests/unit/utils/test_sqlalchemy.py | 36 +++++++++++++++++++++-
4 files changed, 94 insertions(+), 9 deletions(-)
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 14bbd343359..4ba0bbefdce 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -109,6 +109,7 @@ from airflow.timetables.base import DagRunInfo, Timetable
from airflow.triggers.base import StartTriggerArgs
from airflow.utils.code_utils import get_python_source
from airflow.utils.db import LazySelectSequence
+from airflow.utils.sqlalchemy import deserialize_pod_dict
if TYPE_CHECKING:
from inspect import Parameter
@@ -643,9 +644,7 @@ class BaseSerialization:
"Cannot deserialize POD objects without kubernetes
libraries. "
"Please install the `kubernetes` package."
)
- # kubernetes-client does not expose a public dict->model API; see
https://github.com/kubernetes-client/python/issues/977.
- pod = ApiClient()._ApiClient__deserialize_model(var, k8s.V1Pod)
- return pod
+ return deserialize_pod_dict(var)
elif type_ == DAT.TIMEDELTA:
return datetime.timedelta(seconds=var)
elif type_ == DAT.TIMEZONE:
diff --git a/airflow-core/src/airflow/utils/sqlalchemy.py
b/airflow-core/src/airflow/utils/sqlalchemy.py
index bd4781adfe9..c47d8fd4796 100644
--- a/airflow-core/src/airflow/utils/sqlalchemy.py
+++ b/airflow-core/src/airflow/utils/sqlalchemy.py
@@ -271,6 +271,27 @@ def sanitize_for_serialization(obj: V1Pod):
return {key: sanitize_for_serialization(val) for key, val in
obj_dict.items()}
+def deserialize_pod_dict(pod_dict: dict) -> V1Pod:
+ """
+ Deserialize a serialized pod dict back into a ``V1Pod``.
+
+ kubernetes-client exposes no public dict->model API; see
+ https://github.com/kubernetes-client/python/issues/977.
+
+ A fresh ``Configuration`` is passed so that neither the pod nor any nested
model captures the
+ process-global in-cluster ``Configuration``. In-cluster, that global
carries a
+ ``refresh_api_key_hook`` local closure which ``pickle`` cannot serialize,
and which would
+ otherwise break pickling a ``pod_override`` onto the KubernetesExecutor
multiprocessing queue.
+
+ :meta private:
+ """
+ from kubernetes.client import Configuration
+ from kubernetes.client.api_client import ApiClient
+ from kubernetes.client.models.v1_pod import V1Pod
+
+ return
ApiClient(configuration=Configuration())._ApiClient__deserialize_model(pod_dict,
V1Pod)
+
+
def ensure_pod_is_valid_after_unpickling(pod: V1Pod) -> V1Pod | None:
"""
Convert pod to json and back so that pod is safe.
@@ -299,12 +320,9 @@ def ensure_pod_is_valid_after_unpickling(pod: V1Pod) ->
V1Pod | None:
if not isinstance(pod, V1Pod):
return None
try:
- from kubernetes.client.api_client import ApiClient
-
# now we actually reserialize / deserialize the pod
pod_dict = sanitize_for_serialization(pod)
- # kubernetes-client does not expose a public dict->model API; see
https://github.com/kubernetes-client/python/issues/977.
- return ApiClient()._ApiClient__deserialize_model(pod_dict, V1Pod)
+ return deserialize_pod_dict(pod_dict)
except Exception:
return None
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 2d2119095d3..7de4c87148a 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import json
import math
+import pickle
import sys
from collections.abc import Iterator
from datetime import datetime, timedelta
@@ -27,7 +28,7 @@ from typing import TYPE_CHECKING
import pendulum
import pytest
from dateutil import relativedelta
-from kubernetes.client import models as k8s
+from kubernetes.client import Configuration, models as k8s
from pendulum.tz.timezone import FixedTimezone, Timezone
from uuid6 import uuid7
@@ -1389,6 +1390,39 @@ class TestKubernetesImportAvoidance:
_has_kubernetes.cache_clear()
+ def test_deserialized_v1pod_does_not_capture_unpicklable_config(self,
monkeypatch):
+ """A deserialized V1Pod must not capture the in-cluster Configuration.
+
+ In-cluster, the kubernetes client installs a process-global default
``Configuration`` whose
+ ``refresh_api_key_hook`` is an unpicklable local closure. Under
kubernetes-client v36,
+ ``ApiClient.__deserialize_model`` copies that config onto the pod and
every nested model, so a
+ naively deserialized ``pod_override`` cannot be pickled onto the
KubernetesExecutor queue and
+ crashes the scheduler. Deserializing through a fresh ``Configuration``
keeps the pod picklable.
+ """
+ k8s = pytest.importorskip("kubernetes.client.models")
+
+ def _make_unpicklable_hook():
+ def _refresh_api_key(config):
+ return None
+
+ return _refresh_api_key
+
+ dirty = Configuration()
+ dirty.refresh_api_key_hook = _make_unpicklable_hook()
+ monkeypatch.setattr(Configuration, "_default", dirty, raising=False)
+
+ pod = k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(name="test-pod"),
+ spec=k8s.V1PodSpec(containers=[k8s.V1Container(name="base",
image="airflow:3")]),
+ )
+ decoded =
BaseSerialization.deserialize(BaseSerialization.serialize(pod))
+
+ assert isinstance(decoded, k8s.V1Pod)
+ # The top-level pod and every nested model must carry a clean,
picklable Configuration.
+ pickle.dumps(decoded)
+ assert decoded.local_vars_configuration.refresh_api_key_hook is None
+ assert
decoded.spec.containers[0].local_vars_configuration.refresh_api_key_hook is None
+
@pytest.mark.db_test
def test_serialized_dag_getitem_returns_task(dag_maker):
diff --git a/airflow-core/tests/unit/utils/test_sqlalchemy.py
b/airflow-core/tests/unit/utils/test_sqlalchemy.py
index 43ead81f46d..1ef7b42c587 100644
--- a/airflow-core/tests/unit/utils/test_sqlalchemy.py
+++ b/airflow-core/tests/unit/utils/test_sqlalchemy.py
@@ -23,7 +23,7 @@ from copy import deepcopy
from unittest import mock
import pytest
-from kubernetes.client import models as k8s
+from kubernetes.client import Configuration, models as k8s
from sqlalchemy import text
from sqlalchemy.exc import StatementError
@@ -346,3 +346,37 @@ class TestExecutorConfigType:
# show that the pickled (bad) pod is now a good pod, and same as the
copy made
# before making it bad
assert result["pod_override"].to_dict() == copy_of_test_pod.to_dict()
+
+ def
test_ensure_pod_is_valid_after_unpickling_is_picklable_in_cluster(self,
monkeypatch):
+ """The repaired pod must not capture the unpicklable in-cluster
Configuration.
+
+ In-cluster, the kubernetes client installs a process-global default
``Configuration`` whose
+ ``refresh_api_key_hook`` is an unpicklable local closure. When the
repair branch re-deserializes
+ the pod it must round-trip through a fresh ``Configuration`` so it
stays picklable onto the
+ KubernetesExecutor queue.
+ """
+
+ def _make_unpicklable_hook():
+ def _refresh_api_key(config):
+ return None
+
+ return _refresh_api_key
+
+ dirty = Configuration()
+ dirty.refresh_api_key_hook = _make_unpicklable_hook()
+ monkeypatch.setattr(Configuration, "_default", dirty, raising=False)
+
+ container = k8s.V1Container(name="base")
+ pod = k8s.V1Pod(spec=k8s.V1PodSpec(containers=[container]))
+ # Force the repair (re-deserialize) branch the way real version-skew
does: drop a protected
+ # attr so ``to_dict()`` raises and
``ensure_pod_is_valid_after_unpickling`` reserializes.
+ del container._tty
+ with pytest.raises(AttributeError):
+ pod.to_dict()
+
+ fixed_pod = ensure_pod_is_valid_after_unpickling(pod)
+
+ assert fixed_pod is not None
+ pickle.dumps(fixed_pod)
+ assert fixed_pod.local_vars_configuration.refresh_api_key_hook is None
+ assert
fixed_pod.spec.containers[0].local_vars_configuration.refresh_api_key_hook is
None