This is an automated email from the ASF dual-hosted git repository.
ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e0645d564be Make cncf.kubernetes model deserialization picklable
in-cluster (#68848)
e0645d564be is described below
commit e0645d564be47251f81cc2d1d746b14bdb5d857a
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Jun 23 15:06:51 2026 +0100
Make cncf.kubernetes model deserialization picklable in-cluster (#68848)
* Make cncf.kubernetes model deserialization picklable in-cluster
The kubernetes client (v36) attaches the process-global in-cluster
Configuration
to every model it deserializes, and that Configuration's
refresh_api_key_hook is
an unpicklable local closure. Any deserialized model that later gets
pickled --
for example a pod_override placed on the KubernetesExecutor multiprocessing
queue -- then crashes.
Deserialize through an ApiClient built with a fresh Configuration in the
provider's model-deserialization paths so the resulting models (and every
nested
object) stay picklable regardless of the kubernetes client version:
PodGenerator.deserialize_model_dict,
KubernetesJobOperator.deserialize_job_template_file,
and the backcompat _convert_from_dict converter.
* fixup! Make cncf.kubernetes model deserialization picklable in-cluster
* move imports to top level
* Address review feedback on provider deserialization fix
Keep the picklability rationale only in PodGenerator.deserialize_model_dict
(drop the duplicated comments at the job operator and converter call sites),
and replace the unnecessary closure-returning-closure test helper with a
directly-defined local hook (already unpicklable on its own).
---
.../backcompat/backwards_compat_converters.py | 4 +--
.../providers/cncf/kubernetes/operators/job.py | 4 +--
.../providers/cncf/kubernetes/pod_generator.py | 9 +++--
.../backcompat/test_backwards_compat_converters.py | 26 ++++++++++++++-
.../unit/cncf/kubernetes/operators/test_job.py | 39 +++++++++++++++++++++-
.../unit/cncf/kubernetes/test_pod_generator.py | 30 ++++++++++++++++-
6 files changed, 103 insertions(+), 9 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
index 88d16170163..0ca600b4466 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
@@ -18,7 +18,7 @@
from __future__ import annotations
-from kubernetes.client import ApiClient, models as k8s
+from kubernetes.client import ApiClient, Configuration, models as k8s
from airflow.providers.common.compat.sdk import AirflowException
@@ -36,7 +36,7 @@ def _convert_from_dict(obj, new_class):
if isinstance(obj, new_class):
return obj
if isinstance(obj, dict):
- api_client = ApiClient()
+ api_client = ApiClient(configuration=Configuration())
return api_client._ApiClient__deserialize_model(obj, new_class)
raise AirflowException(f"Expected dict or {new_class}, got {type(obj)}")
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
index 0e8540d815c..e888159c739 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
@@ -28,7 +28,7 @@ from collections.abc import Sequence
from functools import cached_property
from typing import TYPE_CHECKING, Any, Literal
-from kubernetes.client import BatchV1Api, models as k8s
+from kubernetes.client import BatchV1Api, Configuration, models as k8s
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException
@@ -378,7 +378,7 @@ class KubernetesJobOperator(KubernetesPodOperator):
job = None
log.warning("Template file %s does not exist", path)
- api_client = ApiClient()
+ api_client = ApiClient(configuration=Configuration())
return api_client._ApiClient__deserialize_model(job, k8s.V1Job)
def on_kill(self) -> None:
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py
index 7e5dc728d77..ca499d363fc 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py
@@ -34,7 +34,7 @@ from functools import reduce
from typing import TYPE_CHECKING
from dateutil import parser
-from kubernetes.client import models as k8s
+from kubernetes.client import Configuration, models as k8s
from kubernetes.client.api_client import ApiClient
from airflow.exceptions import (
@@ -568,10 +568,15 @@ class PodGenerator:
``_ApiClient__deserialize_model`` from the kubernetes client.
This issue is tracked here;
https://github.com/kubernetes-client/python/issues/977.
+ A fresh ``Configuration`` is passed so that neither the pod nor any
nested model captures the
+ process-global in-cluster ``Configuration``. In-cluster, that global
carries a
+ ``refresh_api_key_hook`` local closure which ``pickle`` cannot
serialize, and which would
+ otherwise break pickling a ``pod_override`` onto the
KubernetesExecutor multiprocessing queue.
+
:param pod_dict: Serialized dict of k8s.V1Pod object
:return: De-serialized k8s.V1Pod
"""
- api_client = ApiClient()
+ api_client = ApiClient(configuration=Configuration())
return api_client._ApiClient__deserialize_model(pod_dict, k8s.V1Pod)
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py
index 0c255658216..f3a9dc33a85 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py
@@ -16,10 +16,11 @@
# under the License.
from __future__ import annotations
+import pickle
from unittest.mock import Mock, patch
import pytest
-from kubernetes.client import models as k8s
+from kubernetes.client import Configuration, models as k8s
from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters
import (
_convert_from_dict,
@@ -102,6 +103,29 @@ def test_convert_from_dict_with_invalid_type():
assert str(exc_info.value) == "Expected dict or <class
'unittest.mock.Mock'>, got <class 'str'>"
+def test_convert_from_dict_is_picklable_in_cluster(monkeypatch):
+ """A model deserialized from a dict must not capture the unpicklable
in-cluster Configuration.
+
+ In-cluster, the kubernetes client installs a process-global default
``Configuration`` whose
+ ``refresh_api_key_hook`` is an unpicklable local closure.
``_convert_from_dict`` must deserialize
+ through a fresh ``Configuration`` so the model (and every nested object)
stays picklable.
+ """
+
+ def _refresh_api_key(config):
+ return None
+
+ dirty = Configuration()
+ dirty.refresh_api_key_hook = _refresh_api_key
+ monkeypatch.setattr(Configuration, "_default", dirty, raising=False)
+
+ result = _convert_from_dict({"name": "vol", "emptyDir": {}}, k8s.V1Volume)
+
+ assert isinstance(result, k8s.V1Volume)
+ pickle.dumps(result)
+ assert result.local_vars_configuration.refresh_api_key_hook is None
+ assert result.empty_dir.local_vars_configuration.refresh_api_key_hook is
None
+
+
# testcase of convert_volume() function
@patch("airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters._convert_kube_model_object")
def test_convert_volume_normal_value(mock_convert_kube_model_object):
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
index a0f3049cce5..d99e3e294fb 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import pickle
import random
import re
import string
@@ -24,7 +25,7 @@ from unittest.mock import patch
import pendulum
import pytest
-from kubernetes.client import ApiClient, models as k8s
+from kubernetes.client import ApiClient, Configuration, models as k8s
from kubernetes.client.rest import ApiException
from airflow.exceptions import AirflowProviderDeprecationWarning
@@ -200,6 +201,42 @@ class TestKubernetesJobOperator:
job = k.build_job_request_obj(create_context(k))
assert job.spec.backoff_limit == 6
+ def test_deserialize_job_template_file_is_picklable_in_cluster(self,
tmp_path, monkeypatch):
+ """A job deserialized from a template file must not capture the
in-cluster Configuration.
+
+ In-cluster, the kubernetes client installs a process-global default
``Configuration`` whose
+ ``refresh_api_key_hook`` is an unpicklable local closure.
``deserialize_job_template_file`` must
+ deserialize through a fresh ``Configuration`` so the job (and every
nested model) stays picklable.
+ """
+
+ def _refresh_api_key(config):
+ return None
+
+ dirty = Configuration()
+ dirty.refresh_api_key_hook = _refresh_api_key
+ monkeypatch.setattr(Configuration, "_default", dirty, raising=False)
+
+ template = tmp_path / "job.yaml"
+ template.write_text(
+ "apiVersion: batch/v1\n"
+ "kind: Job\n"
+ "metadata:\n"
+ " name: test-job\n"
+ "spec:\n"
+ " template:\n"
+ " spec:\n"
+ " containers:\n"
+ " - name: base\n"
+ " image: airflow:3\n"
+ )
+
+ job =
KubernetesJobOperator.deserialize_job_template_file(template.as_posix())
+
+ assert isinstance(job, k8s.V1Job)
+ pickle.dumps(job)
+ assert job.local_vars_configuration.refresh_api_key_hook is None
+ assert
job.spec.template.spec.containers[0].local_vars_configuration.refresh_api_key_hook
is None
+
def test_completion_mode_correctly_set(self,
clean_dags_dagruns_and_dagbundles):
k = KubernetesJobOperator(
task_id="task",
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py
index db969eec3f7..cfeea2d6163 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import pickle
import re
from unittest import mock
from unittest.mock import MagicMock
@@ -23,7 +24,7 @@ from unittest.mock import MagicMock
import pendulum
import pytest
from dateutil import parser
-from kubernetes.client import ApiClient, models as k8s
+from kubernetes.client import ApiClient, Configuration, models as k8s
from airflow import __version__
from airflow.exceptions import AirflowConfigException
@@ -698,6 +699,33 @@ class TestPodGenerator:
assert len(caplog.records) == 1
assert "non_existent.yaml does not exist" in caplog.text
+ def test_deserialize_model_dict_is_picklable_in_cluster(self, monkeypatch):
+ """A deserialized pod must not capture the unpicklable in-cluster
Configuration.
+
+ In-cluster, the kubernetes client installs a process-global default
``Configuration`` whose
+ ``refresh_api_key_hook`` is an unpicklable local closure.
``deserialize_model_dict`` must
+ round-trip through a fresh ``Configuration`` so the pod (and every
nested model) stays
+ picklable onto the KubernetesExecutor multiprocessing queue.
+ """
+
+ def _refresh_api_key(config):
+ return None
+
+ dirty = Configuration()
+ dirty.refresh_api_key_hook = _refresh_api_key
+ monkeypatch.setattr(Configuration, "_default", dirty, raising=False)
+
+ pod_dict = {
+ "metadata": {"name": "test-pod"},
+ "spec": {"containers": [{"name": "base", "image": "airflow:3"}]},
+ }
+ pod = PodGenerator.deserialize_model_dict(pod_dict)
+
+ assert isinstance(pod, k8s.V1Pod)
+ pickle.dumps(pod)
+ assert pod.local_vars_configuration.refresh_api_key_hook is None
+ assert
pod.spec.containers[0].local_vars_configuration.refresh_api_key_hook is None
+
@pytest.mark.parametrize(
"input",
(