This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 81ac72c26b5 Use asyncio.run instead of loop.run_until_complete. 
(#53475)
81ac72c26b5 is described below

commit 81ac72c26b5d97a7f53cafb9c6ef23eefe80630a
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Jul 22 12:47:16 2025 -0700

    Use asyncio.run instead of loop.run_until_complete. (#53475)
    
    Main motivator for this PR is that the behavior to create a loop if not 
exists when `loop = asyncio.get_event_loop()` is called is deprecated and will 
be removed from python -- eventually.
    
    But using `asyncio.run` is "more official" and less fiddly than mucking 
around with the loops directly.
    
    I also had to change the "stop looping" logic a bit to avoid a race 
condition that was there and presented when the "await pod start" function ran 
and exited before "watch pod events" had a chance to run.
---
 .../test_kubernetes_pod_operator.py                | 50 +++-------------------
 .../providers/cncf/kubernetes/operators/pod.py     | 16 +++----
 .../providers/cncf/kubernetes/utils/pod_manager.py |  6 +--
 .../decorators/test_kubernetes_commons.py          | 21 +++------
 .../unit/cncf/kubernetes/operators/test_pod.py     |  9 +---
 .../unit/cncf/kubernetes/utils/test_pod_manager.py |  4 +-
 pyproject.toml                                     |  2 +-
 7 files changed, 26 insertions(+), 82 deletions(-)

diff --git 
a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py 
b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py
index d504cee4689..59c52178b1d 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -16,7 +16,6 @@
 # under the License.
 from __future__ import annotations
 
-import asyncio
 import json
 import logging
 import os
@@ -24,7 +23,7 @@ import shutil
 from contextlib import nullcontext
 from copy import copy
 from unittest import mock
-from unittest.mock import ANY, MagicMock
+from unittest.mock import ANY, AsyncMock, MagicMock
 from uuid import uuid4
 
 import pytest
@@ -187,7 +186,6 @@ class TestKubernetesPodOperatorSystem:
         )
         assert not k.do_xcom_push
 
-    @pytest.mark.asyncio
     def test_config_path_move(self, kubeconfig_path, mock_get_connection, 
tmp_path):
         new_config_path = tmp_path / "kube_config.cfg"
         shutil.copy(kubeconfig_path, new_config_path)
@@ -210,7 +208,6 @@ class TestKubernetesPodOperatorSystem:
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         assert actual_pod == expected_pod
 
-    @pytest.mark.asyncio
     def test_working_pod(self, mock_get_connection):
         k = KubernetesPodOperator(
             namespace="default",
@@ -228,7 +225,6 @@ class TestKubernetesPodOperatorSystem:
         assert self.expected_pod["spec"] == actual_pod["spec"]
         assert self.expected_pod["metadata"]["labels"] == 
actual_pod["metadata"]["labels"]
 
-    @pytest.mark.asyncio
     def test_skip_cleanup(self, mock_get_connection):
         k = KubernetesPodOperator(
             namespace="unknown",
@@ -244,7 +240,6 @@ class TestKubernetesPodOperatorSystem:
         with pytest.raises(ApiException):
             k.execute(context)
 
-    @pytest.mark.asyncio
     def test_delete_operator_pod(self, mock_get_connection):
         k = KubernetesPodOperator(
             namespace="default",
@@ -263,7 +258,6 @@ class TestKubernetesPodOperatorSystem:
         assert self.expected_pod["spec"] == actual_pod["spec"]
         assert self.expected_pod["metadata"]["labels"] == 
actual_pod["metadata"]["labels"]
 
-    @pytest.mark.asyncio
     def test_skip_on_specified_exit_code(self, mock_get_connection):
         k = KubernetesPodOperator(
             namespace="default",
@@ -280,7 +274,6 @@ class TestKubernetesPodOperatorSystem:
         with pytest.raises(AirflowSkipException):
             k.execute(context)
 
-    @pytest.mark.asyncio
     def test_already_checked_on_success(self, mock_get_connection):
         """
         When ``on_finish_action="keep_pod"``, pod should have 'already_checked'
@@ -303,7 +296,6 @@ class TestKubernetesPodOperatorSystem:
         actual_pod = self.api_client.sanitize_for_serialization(actual_pod)
         assert actual_pod["metadata"]["labels"]["already_checked"] == "True"
 
-    @pytest.mark.asyncio
     def test_already_checked_on_failure(self, mock_get_connection):
         """
         When ``on_finish_action="keep_pod"``, pod should have 'already_checked'
@@ -329,7 +321,6 @@ class TestKubernetesPodOperatorSystem:
         assert status["state"]["terminated"]["reason"] == "Error"
         assert actual_pod["metadata"]["labels"]["already_checked"] == "True"
 
-    @pytest.mark.asyncio
     def test_pod_hostnetwork(self, mock_get_connection):
         k = KubernetesPodOperator(
             namespace="default",
@@ -349,7 +340,6 @@ class TestKubernetesPodOperatorSystem:
         assert self.expected_pod["spec"] == actual_pod["spec"]
         assert self.expected_pod["metadata"]["labels"] == 
actual_pod["metadata"]["labels"]
 
-    @pytest.mark.asyncio
     def test_pod_dnspolicy(self, mock_get_connection):
         dns_policy = "ClusterFirstWithHostNet"
         k = KubernetesPodOperator(
@@ -372,7 +362,6 @@ class TestKubernetesPodOperatorSystem:
         assert self.expected_pod["spec"] == actual_pod["spec"]
         assert self.expected_pod["metadata"]["labels"] == 
actual_pod["metadata"]["labels"]
 
-    @pytest.mark.asyncio
     def test_pod_schedulername(self, mock_get_connection):
         scheduler_name = "default-scheduler"
         k = KubernetesPodOperator(
@@ -392,7 +381,6 @@ class TestKubernetesPodOperatorSystem:
         self.expected_pod["spec"]["schedulerName"] = scheduler_name
         assert self.expected_pod == actual_pod
 
-    @pytest.mark.asyncio
     def test_pod_node_selector(self, mock_get_connection):
         node_selector = {"beta.kubernetes.io/os": "linux"}
         k = KubernetesPodOperator(
@@ -412,7 +400,6 @@ class TestKubernetesPodOperatorSystem:
         self.expected_pod["spec"]["nodeSelector"] = node_selector
         assert self.expected_pod == actual_pod
 
-    @pytest.mark.asyncio
     def test_pod_resources(self, mock_get_connection):
         resources = k8s.V1ResourceRequirements(
             requests={"memory": "64Mi", "cpu": "250m", "ephemeral-storage": 
"1Gi"},
@@ -438,7 +425,6 @@ class TestKubernetesPodOperatorSystem:
         }
         assert self.expected_pod == actual_pod
 
-    @pytest.mark.asyncio
     @pytest.mark.parametrize(
         "val",
         [
@@ -515,7 +501,6 @@ class TestKubernetesPodOperatorSystem:
         self.expected_pod["spec"]["affinity"] = expected
         assert self.expected_pod == actual_pod
 
-    @pytest.mark.asyncio
     def test_port(self, mock_get_connection):
         port = k8s.V1ContainerPort(
             name="http",
@@ -539,7 +524,6 @@ class TestKubernetesPodOperatorSystem:
         self.expected_pod["spec"]["containers"][0]["ports"] = [{"name": 
"http", "containerPort": 80}]
         assert self.expected_pod == actual_pod
 
-    @pytest.mark.asyncio
     def test_volume_mount(self, mock_get_connection):
         with mock.patch.object(PodManager, "log") as mock_logger:
             volume_mount = k8s.V1VolumeMount(
@@ -579,7 +563,6 @@ class TestKubernetesPodOperatorSystem:
             ]
             assert self.expected_pod == actual_pod
 
-    @pytest.mark.asyncio
     @pytest.mark.parametrize("uid", [0, 1000])
     def test_run_as_user(self, uid, mock_get_connection):
         security_context = V1PodSecurityContext(run_as_user=uid)
@@ -605,7 +588,6 @@ class TestKubernetesPodOperatorSystem:
         )
         assert pod.to_dict()["spec"]["security_context"]["run_as_user"] == uid
 
-    @pytest.mark.asyncio
     @pytest.mark.parametrize("gid", [0, 1000])
     def test_fs_group(self, gid, mock_get_connection):
         security_context = V1PodSecurityContext(fs_group=gid)
@@ -631,7 +613,6 @@ class TestKubernetesPodOperatorSystem:
         )
         assert pod.to_dict()["spec"]["security_context"]["fs_group"] == gid
 
-    @pytest.mark.asyncio
     def test_disable_privilege_escalation(self, mock_get_connection):
         container_security_context = 
V1SecurityContext(allow_privilege_escalation=False)
 
@@ -654,7 +635,6 @@ class TestKubernetesPodOperatorSystem:
         }
         assert self.expected_pod == actual_pod
 
-    @pytest.mark.asyncio
     def test_faulty_image(self, mock_get_connection):
         bad_image_name = "foobar"
         k = KubernetesPodOperator(
@@ -693,7 +673,6 @@ class TestKubernetesPodOperatorSystem:
         with pytest.raises(ApiException, match="error looking up service 
account default/foobar"):
             k.get_or_create_pod(pod, context)
 
-    @pytest.mark.asyncio
     def test_pod_failure(self, mock_get_connection):
         """
         Tests that the task fails when a pod reports a failure
@@ -716,7 +695,6 @@ class TestKubernetesPodOperatorSystem:
         self.expected_pod["spec"]["containers"][0]["args"] = 
bad_internal_command
         assert self.expected_pod == actual_pod
 
-    @pytest.mark.asyncio
     def test_xcom_push(self, test_label, mock_get_connection):
         expected = {"test_label": test_label, "buzz": 2}
         args = [f"echo '{json.dumps(expected)}' > /airflow/xcom/return.json"]
@@ -765,7 +743,6 @@ class TestKubernetesPodOperatorSystem:
         ]
         assert self.expected_pod == actual_pod
 
-    @pytest.mark.asyncio
     def test_pod_template_file_system(self, mock_get_connection, 
basic_pod_template):
         """Note: this test requires that you have a namespace ``mem-example`` 
in your cluster."""
         k = KubernetesPodOperator(
@@ -781,7 +758,6 @@ class TestKubernetesPodOperatorSystem:
         assert result is not None
         assert result == {"hello": "world"}
 
-    @pytest.mark.asyncio
     @pytest.mark.parametrize(
         "env_vars",
         [
@@ -817,7 +793,6 @@ class TestKubernetesPodOperatorSystem:
         assert k.pod.spec.containers[0].env == [k8s.V1EnvVar(name="env_name", 
value="value")]
         assert result == {"hello": "world"}
 
-    @pytest.mark.asyncio
     def test_pod_template_file_with_full_pod_spec(self, test_label, 
mock_get_connection, basic_pod_template):
         pod_spec = k8s.V1Pod(
             metadata=k8s.V1ObjectMeta(
@@ -858,7 +833,6 @@ class TestKubernetesPodOperatorSystem:
         assert k.pod.spec.containers[0].env == [k8s.V1EnvVar(name="env_name", 
value="value")]
         assert result == {"hello": "world"}
 
-    @pytest.mark.asyncio
     def test_full_pod_spec(self, test_label, mock_get_connection):
         pod_spec = k8s.V1Pod(
             metadata=k8s.V1ObjectMeta(
@@ -904,7 +878,6 @@ class TestKubernetesPodOperatorSystem:
         assert k.pod.spec.containers[0].env == [k8s.V1EnvVar(name="env_name", 
value="value")]
         assert result == {"hello": "world"}
 
-    @pytest.mark.asyncio
     def test_init_container(self, mock_get_connection):
         # GIVEN
         volume_mounts = [
@@ -959,10 +932,11 @@ class TestKubernetesPodOperatorSystem:
         ]
         assert self.expected_pod == actual_pod
 
-    @pytest.mark.asyncio
     @mock.patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start")
     @mock.patch(f"{POD_MANAGER_CLASS}.extract_xcom")
     @mock.patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
+    @mock.patch(f"{POD_MANAGER_CLASS}.watch_pod_events", new=AsyncMock())
+    @mock.patch(f"{POD_MANAGER_CLASS}.await_pod_start", new=AsyncMock())
     @mock.patch(f"{POD_MANAGER_CLASS}.create_pod", new=MagicMock)
     @mock.patch(HOOK_CLASS)
     def test_pod_template_file(
@@ -1067,8 +1041,9 @@ class TestKubernetesPodOperatorSystem:
         del actual_pod["metadata"]["labels"]["airflow_version"]
         assert expected_dict == actual_pod
 
-    @pytest.mark.asyncio
     @mock.patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
+    @mock.patch(f"{POD_MANAGER_CLASS}.watch_pod_events", new=AsyncMock())
+    @mock.patch(f"{POD_MANAGER_CLASS}.await_pod_start", new=AsyncMock())
     @mock.patch(f"{POD_MANAGER_CLASS}.create_pod", new=MagicMock)
     @mock.patch(HOOK_CLASS)
     def test_pod_priority_class_name(self, hook_mock, 
await_pod_completion_mock):
@@ -1102,7 +1077,6 @@ class TestKubernetesPodOperatorSystem:
         self.expected_pod["spec"]["priorityClassName"] = priority_class_name
         assert self.expected_pod == actual_pod
 
-    @pytest.mark.asyncio
     def test_pod_name(self, mock_get_connection):
         pod_name_too_long = "a" * 221
         k = KubernetesPodOperator(
@@ -1122,7 +1096,6 @@ class TestKubernetesPodOperatorSystem:
         with pytest.raises(AirflowException):
             k.execute(context)
 
-    @pytest.mark.asyncio
     def test_on_kill(self, mock_get_connection):
         hook = KubernetesHook(conn_id=None, in_cluster=False)
         client = hook.core_v1_client
@@ -1163,7 +1136,6 @@ class TestKubernetesPodOperatorSystem:
         with pytest.raises(ApiException, match=r'pods \\"test.[a-z0-9]+\\" not 
found'):
             client.read_namespaced_pod(name=name, namespace=namespace)
 
-    @pytest.mark.asyncio
     def test_reattach_failing_pod_once(self, mock_get_connection):
         hook = KubernetesHook(conn_id=None, in_cluster=False)
         client = hook.core_v1_client
@@ -1224,11 +1196,6 @@ class TestKubernetesPodOperatorSystem:
         # recreate op just to ensure we're not relying on any statefulness
         k = get_op()
 
-        # Before next attempt we need to re-create event loop if it is closed.
-        loop = asyncio.get_event_loop()
-        if loop.is_closed():
-            asyncio.set_event_loop(asyncio.new_event_loop())
-
         # `create_pod` should be called because though there's still a pod to 
be found,
         # it will be `already_checked`
         with mock.patch(f"{POD_MANAGER_CLASS}.create_pod") as create_mock:
@@ -1236,7 +1203,6 @@ class TestKubernetesPodOperatorSystem:
                 k.execute(context)
             create_mock.assert_called_once()
 
-    @pytest.mark.asyncio
     def test_changing_base_container_name_with_get_logs(self, 
mock_get_connection):
         k = KubernetesPodOperator(
             namespace="default",
@@ -1262,7 +1228,6 @@ class TestKubernetesPodOperatorSystem:
         self.expected_pod["spec"]["containers"][0]["name"] = "apple-sauce"
         assert self.expected_pod["spec"] == actual_pod["spec"]
 
-    @pytest.mark.asyncio
     def test_changing_base_container_name_no_logs(self, mock_get_connection):
         """
         This test checks BOTH a modified base container name AND the 
get_logs=False flow,
@@ -1293,7 +1258,6 @@ class TestKubernetesPodOperatorSystem:
         self.expected_pod["spec"]["containers"][0]["name"] = "apple-sauce"
         assert self.expected_pod["spec"] == actual_pod["spec"]
 
-    @pytest.mark.asyncio
     def test_changing_base_container_name_no_logs_long(self, 
mock_get_connection):
         """
         Similar to test_changing_base_container_name_no_logs, but ensures that
@@ -1325,7 +1289,6 @@ class TestKubernetesPodOperatorSystem:
         self.expected_pod["spec"]["containers"][0]["args"] = ["sleep 3"]
         assert self.expected_pod["spec"] == actual_pod["spec"]
 
-    @pytest.mark.asyncio
     def test_changing_base_container_name_failure(self, mock_get_connection):
         k = KubernetesPodOperator(
             namespace="default",
@@ -1372,7 +1335,6 @@ class TestKubernetesPodOperatorSystem:
         )
         assert MyK8SPodOperator(task_id=str(uuid4())).base_container_name == 
"tomato-sauce"
 
-    @pytest.mark.asyncio
     def test_init_container_logs(self, mock_get_connection):
         marker_from_init_container = f"{uuid4()}"
         marker_from_main_container = f"{uuid4()}"
@@ -1404,7 +1366,6 @@ class TestKubernetesPodOperatorSystem:
         assert marker_from_init_container in calls_args
         assert marker_from_main_container in calls_args
 
-    @pytest.mark.asyncio
     def test_init_container_logs_filtered(self, mock_get_connection):
         marker_from_init_container_to_log_1 = f"{uuid4()}"
         marker_from_init_container_to_log_2 = f"{uuid4()}"
@@ -1502,7 +1463,6 @@ def 
test_hide_sensitive_field_in_templated_fields_on_error(caplog, monkeypatch):
 
 
 class TestKubernetesPodOperator(BaseK8STest):
-    @pytest.mark.asyncio
     @pytest.mark.parametrize(
         "active_deadline_seconds,should_fail",
         [(3, True), (60, False)],
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index a2c244c9ea2..420ebddb725 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -608,20 +608,18 @@ class KubernetesPodOperator(BaseOperator):
 
     def await_pod_start(self, pod: k8s.V1Pod) -> None:
         try:
-            loop = asyncio.get_event_loop()
-            events_task = asyncio.ensure_future(
-                self.pod_manager.watch_pod_events(pod, 
self.startup_check_interval_seconds)
-            )
-            loop.run_until_complete(
-                self.pod_manager.await_pod_start(
+
+            async def _await_pod_start():
+                events_task = self.pod_manager.watch_pod_events(pod, 
self.startup_check_interval_seconds)
+                pod_start_task = self.pod_manager.await_pod_start(
                     pod=pod,
                     schedule_timeout=self.schedule_timeout_seconds,
                     startup_timeout=self.startup_timeout_seconds,
                     check_interval=self.startup_check_interval_seconds,
                 )
-            )
-            loop.run_until_complete(events_task)
-            loop.close()
+                await asyncio.gather(pod_start_task, events_task)
+
+            asyncio.run(_await_pod_start())
         except PodLaunchFailedException:
             if self.log_events_on_failure:
                 self._read_pod_events(pod, reraise=False)
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 12a10e0dad3..ae30c5ca429 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -338,6 +338,7 @@ class PodManager(LoggingMixin):
         self._client = kube_client
         self._watch = watch.Watch()
         self._callbacks = callbacks or []
+        self.stop_watching_events = False
 
     def run_pod_async(self, pod: V1Pod, **kwargs) -> V1Pod:
         """Run POD asynchronously."""
@@ -380,9 +381,8 @@ class PodManager(LoggingMixin):
 
     async def watch_pod_events(self, pod: V1Pod, check_interval: int = 1) -> 
None:
         """Read pod events and writes into log."""
-        self.keep_watching_for_events = True
         num_events = 0
-        while self.keep_watching_for_events:
+        while not self.stop_watching_events:
             events = self.read_pod_events(pod)
             for new_event in events.items[num_events:]:
                 involved_object: V1ObjectReference = new_event.involved_object
@@ -413,7 +413,7 @@ class PodManager(LoggingMixin):
             remote_pod = self.read_pod(pod)
             pod_status = remote_pod.status
             if pod_status.phase != PodPhase.PENDING:
-                self.keep_watching_for_events = False
+                self.stop_watching_events = True
                 self.log.info("::endgroup::")
                 break
 
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py
index 94a71961acc..40285db98f5 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py
@@ -16,7 +16,6 @@
 # under the License.
 from __future__ import annotations
 
-import asyncio
 from collections.abc import Callable
 from unittest import mock
 
@@ -91,18 +90,8 @@ class TestKubernetesDecoratorsBase:
         self.dag = dag
 
         self.mock_create_pod = 
mock.patch(f"{POD_MANAGER_CLASS}.create_pod").start()
-        self.mock_watch_pod_events_patch = mock.patch(
-            f"{POD_MANAGER_CLASS}.watch_pod_events", 
new_callable=mock.AsyncMock
-        )
-        self.mock_watch_pod_events = self.mock_watch_pod_events_patch.start()
-        self.mock_watch_pod_events.return_value = asyncio.Future()
-        self.mock_watch_pod_events.return_value.set_result(None)
-        self.mock_await_pod_start_patch = mock.patch(
-            f"{POD_MANAGER_CLASS}.await_pod_start", new_callable=mock.AsyncMock
-        )
-        self.mock_await_pod_start = self.mock_await_pod_start_patch.start()
-        self.mock_await_pod_start.return_value = asyncio.Future()
-        self.mock_await_pod_start.return_value.set_result(None)
+        self.mock_await_pod_start = 
mock.patch(f"{POD_MANAGER_CLASS}.await_pod_start").start()
+        self.mock_watch_pod_events = 
mock.patch(f"{POD_MANAGER_CLASS}.watch_pod_events").start()
         self.mock_await_xcom_sidecar_container_start = mock.patch(
             f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start"
         ).start()
@@ -120,8 +109,10 @@ class TestKubernetesDecoratorsBase:
         self.mock_fetch_logs = 
mock.patch(f"{POD_MANAGER_CLASS}.fetch_requested_container_logs").start()
         self.mock_fetch_logs.return_value = "logs"
 
-        yield
-
+        try:
+            yield
+        except Exception:
+            pass
         mock.patch.stopall()
 
     def teardown_method(self):
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index 753bf712662..634a29028ea 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -16,7 +16,6 @@
 # under the License.
 from __future__ import annotations
 
-import asyncio
 import datetime
 import re
 from contextlib import contextmanager, nullcontext
@@ -155,17 +154,13 @@ class TestKubernetesPodOperator:
     @pytest.fixture(autouse=True)
     def setup_tests(self, dag_maker):
         self.create_pod_patch = patch(f"{POD_MANAGER_CLASS}.create_pod")
-        self.watch_pod_events = patch(f"{POD_MANAGER_CLASS}.watch_pod_events", 
new_callable=mock.AsyncMock)
-        self.await_pod_patch = patch(f"{POD_MANAGER_CLASS}.await_pod_start", 
new_callable=mock.AsyncMock)
+        self.watch_pod_events = patch(f"{POD_MANAGER_CLASS}.watch_pod_events")
+        self.await_pod_patch = patch(f"{POD_MANAGER_CLASS}.await_pod_start")
         self.await_pod_completion_patch = 
patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
         self._default_client_patch = patch(f"{HOOK_CLASS}._get_default_client")
         self.watch_pod_events_mock = self.watch_pod_events.start()
-        self.watch_pod_events_mock.return_value = asyncio.Future()
-        self.watch_pod_events_mock.return_value.set_result(None)
         self.create_mock = self.create_pod_patch.start()
         self.await_start_mock = self.await_pod_patch.start()
-        self.await_start_mock.return_value = asyncio.Future()
-        self.await_start_mock.return_value.set_result(None)
         self.await_pod_mock = self.await_pod_completion_patch.start()
         self._default_client_mock = self._default_client_patch.start()
         self.dag_maker = dag_maker
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index e873e8bbb5b..194d1e3e96a 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -173,7 +173,7 @@ class TestPodManager:
         startup_check_interval = 10
 
         def mock_read_pod_events(pod):
-            self.pod_manager.keep_watching_for_events = False
+            self.pod_manager.stop_watching_events = True
             return events
 
         with (
@@ -514,7 +514,7 @@ class TestPodManager:
         assert mock_time_sleep.call_count == 3
         assert f"::group::Waiting until {schedule_timeout}s to get the POD 
scheduled..." in caplog.text
         assert f"Waiting {startup_timeout}s to get the POD running..." in 
caplog.text
-        assert not self.pod_manager.keep_watching_for_events
+        assert self.pod_manager.stop_watching_events is True
 
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
     def test_container_is_running(self, container_is_running_mock):
diff --git a/pyproject.toml b/pyproject.toml
index c6e02a26946..02768457878 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -619,7 +619,6 @@ extend-select = [
     "RET506", # Unnecessary {branch} after raise statement
     "RET507", # Unnecessary {branch} after continue statement
     "RET508", # Unnecessary {branch} after break statement
-    "SIM105", # Use contextlib.suppress({exception}) instead of try-except-pass
 ]
 ignore = [
     "D100", # Unwanted; Docstring at the top of every file.
@@ -655,6 +654,7 @@ ignore = [
     "E501", # Formatted code may exceed the line length, leading to 
line-too-long (E501) errors.
     "ASYNC110", # TODO: Use `anyio.Event` instead of awaiting `anyio.sleep` in 
a `while` loop
     "UP038",
+    "SIM105", # Use contextlib.suppress({exception}) instead of try-except-pass
 ]
 unfixable = [
     # PT022 replace empty `yield` to empty `return`. Might be fixed with a 
combination of PLR1711

Reply via email to