This is an automated email from the ASF dual-hosted git repository.

ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e0645d564be Make cncf.kubernetes model deserialization picklable 
in-cluster (#68848)
e0645d564be is described below

commit e0645d564be47251f81cc2d1d746b14bdb5d857a
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Jun 23 15:06:51 2026 +0100

    Make cncf.kubernetes model deserialization picklable in-cluster (#68848)
    
    * Make cncf.kubernetes model deserialization picklable in-cluster
    
    The kubernetes client (v36) attaches the process-global in-cluster 
Configuration
    to every model it deserializes, and that Configuration's 
refresh_api_key_hook is
    an unpicklable local closure. Any deserialized model that later gets 
pickled --
    for example a pod_override placed on the KubernetesExecutor multiprocessing
    queue -- then crashes.
    
    Deserialize through an ApiClient built with a fresh Configuration in the
    provider's model-deserialization paths so the resulting models (and every 
nested
    object) stay picklable regardless of the kubernetes client version:
    PodGenerator.deserialize_model_dict, 
KubernetesJobOperator.deserialize_job_template_file,
    and the backcompat _convert_from_dict converter.
    
    * fixup! Make cncf.kubernetes model deserialization picklable in-cluster
    
    * move imports to top level
    
    * Address review feedback on provider deserialization fix
    
    Keep the picklability rationale only in PodGenerator.deserialize_model_dict
    (drop the duplicated comments at the job operator and converter call sites),
    and replace the unnecessary closure-returning-closure test helper with a
    directly-defined local hook (already unpicklable on its own).
---
 .../backcompat/backwards_compat_converters.py      |  4 +--
 .../providers/cncf/kubernetes/operators/job.py     |  4 +--
 .../providers/cncf/kubernetes/pod_generator.py     |  9 +++--
 .../backcompat/test_backwards_compat_converters.py | 26 ++++++++++++++-
 .../unit/cncf/kubernetes/operators/test_job.py     | 39 +++++++++++++++++++++-
 .../unit/cncf/kubernetes/test_pod_generator.py     | 30 ++++++++++++++++-
 6 files changed, 103 insertions(+), 9 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
index 88d16170163..0ca600b4466 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
@@ -18,7 +18,7 @@
 
 from __future__ import annotations
 
-from kubernetes.client import ApiClient, models as k8s
+from kubernetes.client import ApiClient, Configuration, models as k8s
 
 from airflow.providers.common.compat.sdk import AirflowException
 
@@ -36,7 +36,7 @@ def _convert_from_dict(obj, new_class):
     if isinstance(obj, new_class):
         return obj
     if isinstance(obj, dict):
-        api_client = ApiClient()
+        api_client = ApiClient(configuration=Configuration())
         return api_client._ApiClient__deserialize_model(obj, new_class)
     raise AirflowException(f"Expected dict or {new_class}, got {type(obj)}")
 
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
index 0e8540d815c..e888159c739 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
@@ -28,7 +28,7 @@ from collections.abc import Sequence
 from functools import cached_property
 from typing import TYPE_CHECKING, Any, Literal
 
-from kubernetes.client import BatchV1Api, models as k8s
+from kubernetes.client import BatchV1Api, Configuration, models as k8s
 from kubernetes.client.api_client import ApiClient
 from kubernetes.client.rest import ApiException
 
@@ -378,7 +378,7 @@ class KubernetesJobOperator(KubernetesPodOperator):
             job = None
             log.warning("Template file %s does not exist", path)
 
-        api_client = ApiClient()
+        api_client = ApiClient(configuration=Configuration())
         return api_client._ApiClient__deserialize_model(job, k8s.V1Job)
 
     def on_kill(self) -> None:
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py
index 7e5dc728d77..ca499d363fc 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py
@@ -34,7 +34,7 @@ from functools import reduce
 from typing import TYPE_CHECKING
 
 from dateutil import parser
-from kubernetes.client import models as k8s
+from kubernetes.client import Configuration, models as k8s
 from kubernetes.client.api_client import ApiClient
 
 from airflow.exceptions import (
@@ -568,10 +568,15 @@ class PodGenerator:
         ``_ApiClient__deserialize_model`` from the kubernetes client.
         This issue is tracked here; 
https://github.com/kubernetes-client/python/issues/977.
 
+        A fresh ``Configuration`` is passed so that neither the pod nor any 
nested model captures the
+        process-global in-cluster ``Configuration``. In-cluster, that global 
carries a
+        ``refresh_api_key_hook`` local closure which ``pickle`` cannot 
serialize, and which would
+        otherwise break pickling a ``pod_override`` onto the 
KubernetesExecutor multiprocessing queue.
+
         :param pod_dict: Serialized dict of k8s.V1Pod object
         :return: De-serialized k8s.V1Pod
         """
-        api_client = ApiClient()
+        api_client = ApiClient(configuration=Configuration())
         return api_client._ApiClient__deserialize_model(pod_dict, k8s.V1Pod)
 
 
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py
index 0c255658216..f3a9dc33a85 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/backcompat/test_backwards_compat_converters.py
@@ -16,10 +16,11 @@
 # under the License.
 from __future__ import annotations
 
+import pickle
 from unittest.mock import Mock, patch
 
 import pytest
-from kubernetes.client import models as k8s
+from kubernetes.client import Configuration, models as k8s
 
 from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters 
import (
     _convert_from_dict,
@@ -102,6 +103,29 @@ def test_convert_from_dict_with_invalid_type():
     assert str(exc_info.value) == "Expected dict or <class 
'unittest.mock.Mock'>, got <class 'str'>"
 
 
+def test_convert_from_dict_is_picklable_in_cluster(monkeypatch):
+    """A model deserialized from a dict must not capture the unpicklable 
in-cluster Configuration.
+
+    In-cluster, the kubernetes client installs a process-global default 
``Configuration`` whose
+    ``refresh_api_key_hook`` is an unpicklable local closure. 
``_convert_from_dict`` must deserialize
+    through a fresh ``Configuration`` so the model (and every nested object) 
stays picklable.
+    """
+
+    def _refresh_api_key(config):
+        return None
+
+    dirty = Configuration()
+    dirty.refresh_api_key_hook = _refresh_api_key
+    monkeypatch.setattr(Configuration, "_default", dirty, raising=False)
+
+    result = _convert_from_dict({"name": "vol", "emptyDir": {}}, k8s.V1Volume)
+
+    assert isinstance(result, k8s.V1Volume)
+    pickle.dumps(result)
+    assert result.local_vars_configuration.refresh_api_key_hook is None
+    assert result.empty_dir.local_vars_configuration.refresh_api_key_hook is 
None
+
+
 # testcase of convert_volume() function
 
@patch("airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters._convert_kube_model_object")
 def test_convert_volume_normal_value(mock_convert_kube_model_object):
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
index a0f3049cce5..d99e3e294fb 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import pickle
 import random
 import re
 import string
@@ -24,7 +25,7 @@ from unittest.mock import patch
 
 import pendulum
 import pytest
-from kubernetes.client import ApiClient, models as k8s
+from kubernetes.client import ApiClient, Configuration, models as k8s
 from kubernetes.client.rest import ApiException
 
 from airflow.exceptions import AirflowProviderDeprecationWarning
@@ -200,6 +201,42 @@ class TestKubernetesJobOperator:
         job = k.build_job_request_obj(create_context(k))
         assert job.spec.backoff_limit == 6
 
+    def test_deserialize_job_template_file_is_picklable_in_cluster(self, 
tmp_path, monkeypatch):
+        """A job deserialized from a template file must not capture the 
in-cluster Configuration.
+
+        In-cluster, the kubernetes client installs a process-global default 
``Configuration`` whose
+        ``refresh_api_key_hook`` is an unpicklable local closure. 
``deserialize_job_template_file`` must
+        deserialize through a fresh ``Configuration`` so the job (and every 
nested model) stays picklable.
+        """
+
+        def _refresh_api_key(config):
+            return None
+
+        dirty = Configuration()
+        dirty.refresh_api_key_hook = _refresh_api_key
+        monkeypatch.setattr(Configuration, "_default", dirty, raising=False)
+
+        template = tmp_path / "job.yaml"
+        template.write_text(
+            "apiVersion: batch/v1\n"
+            "kind: Job\n"
+            "metadata:\n"
+            "  name: test-job\n"
+            "spec:\n"
+            "  template:\n"
+            "    spec:\n"
+            "      containers:\n"
+            "        - name: base\n"
+            "          image: airflow:3\n"
+        )
+
+        job = 
KubernetesJobOperator.deserialize_job_template_file(template.as_posix())
+
+        assert isinstance(job, k8s.V1Job)
+        pickle.dumps(job)
+        assert job.local_vars_configuration.refresh_api_key_hook is None
+        assert 
job.spec.template.spec.containers[0].local_vars_configuration.refresh_api_key_hook
 is None
+
     def test_completion_mode_correctly_set(self, 
clean_dags_dagruns_and_dagbundles):
         k = KubernetesJobOperator(
             task_id="task",
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py
index db969eec3f7..cfeea2d6163 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_pod_generator.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import pickle
 import re
 from unittest import mock
 from unittest.mock import MagicMock
@@ -23,7 +24,7 @@ from unittest.mock import MagicMock
 import pendulum
 import pytest
 from dateutil import parser
-from kubernetes.client import ApiClient, models as k8s
+from kubernetes.client import ApiClient, Configuration, models as k8s
 
 from airflow import __version__
 from airflow.exceptions import AirflowConfigException
@@ -698,6 +699,33 @@ class TestPodGenerator:
         assert len(caplog.records) == 1
         assert "non_existent.yaml does not exist" in caplog.text
 
+    def test_deserialize_model_dict_is_picklable_in_cluster(self, monkeypatch):
+        """A deserialized pod must not capture the unpicklable in-cluster 
Configuration.
+
+        In-cluster, the kubernetes client installs a process-global default 
``Configuration`` whose
+        ``refresh_api_key_hook`` is an unpicklable local closure. 
``deserialize_model_dict`` must
+        round-trip through a fresh ``Configuration`` so the pod (and every 
nested model) stays
+        picklable onto the KubernetesExecutor multiprocessing queue.
+        """
+
+        def _refresh_api_key(config):
+            return None
+
+        dirty = Configuration()
+        dirty.refresh_api_key_hook = _refresh_api_key
+        monkeypatch.setattr(Configuration, "_default", dirty, raising=False)
+
+        pod_dict = {
+            "metadata": {"name": "test-pod"},
+            "spec": {"containers": [{"name": "base", "image": "airflow:3"}]},
+        }
+        pod = PodGenerator.deserialize_model_dict(pod_dict)
+
+        assert isinstance(pod, k8s.V1Pod)
+        pickle.dumps(pod)
+        assert pod.local_vars_configuration.refresh_api_key_hook is None
+        assert 
pod.spec.containers[0].local_vars_configuration.refresh_api_key_hook is None
+
     @pytest.mark.parametrize(
         "input",
         (

Reply via email to