This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 81ac72c26b5 Use asyncio.run instead of loop.run_until_complete.
(#53475)
81ac72c26b5 is described below
commit 81ac72c26b5d97a7f53cafb9c6ef23eefe80630a
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Jul 22 12:47:16 2025 -0700
Use asyncio.run instead of loop.run_until_complete. (#53475)
Main motivator for this PR is that the behavior to create a loop if not
exists when `loop = asyncio.get_event_loop()` is called is deprecated and will
be removed from python -- eventually.
But using `asyncio.run` is "more official" and less fiddly than mucking
around with the loops directly.
I also had to change the "stop looping" logic a bit to avoid a race
condition that was there and presented when the "await pod start" function ran
and exited before "watch pod events" had a chance to run.
---
.../test_kubernetes_pod_operator.py | 50 +++-------------------
.../providers/cncf/kubernetes/operators/pod.py | 16 +++----
.../providers/cncf/kubernetes/utils/pod_manager.py | 6 +--
.../decorators/test_kubernetes_commons.py | 21 +++------
.../unit/cncf/kubernetes/operators/test_pod.py | 9 +---
.../unit/cncf/kubernetes/utils/test_pod_manager.py | 4 +-
pyproject.toml | 2 +-
7 files changed, 26 insertions(+), 82 deletions(-)
diff --git
a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py
b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py
index d504cee4689..59c52178b1d 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -16,7 +16,6 @@
# under the License.
from __future__ import annotations
-import asyncio
import json
import logging
import os
@@ -24,7 +23,7 @@ import shutil
from contextlib import nullcontext
from copy import copy
from unittest import mock
-from unittest.mock import ANY, MagicMock
+from unittest.mock import ANY, AsyncMock, MagicMock
from uuid import uuid4
import pytest
@@ -187,7 +186,6 @@ class TestKubernetesPodOperatorSystem:
)
assert not k.do_xcom_push
- @pytest.mark.asyncio
def test_config_path_move(self, kubeconfig_path, mock_get_connection,
tmp_path):
new_config_path = tmp_path / "kube_config.cfg"
shutil.copy(kubeconfig_path, new_config_path)
@@ -210,7 +208,6 @@ class TestKubernetesPodOperatorSystem:
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
assert actual_pod == expected_pod
- @pytest.mark.asyncio
def test_working_pod(self, mock_get_connection):
k = KubernetesPodOperator(
namespace="default",
@@ -228,7 +225,6 @@ class TestKubernetesPodOperatorSystem:
assert self.expected_pod["spec"] == actual_pod["spec"]
assert self.expected_pod["metadata"]["labels"] ==
actual_pod["metadata"]["labels"]
- @pytest.mark.asyncio
def test_skip_cleanup(self, mock_get_connection):
k = KubernetesPodOperator(
namespace="unknown",
@@ -244,7 +240,6 @@ class TestKubernetesPodOperatorSystem:
with pytest.raises(ApiException):
k.execute(context)
- @pytest.mark.asyncio
def test_delete_operator_pod(self, mock_get_connection):
k = KubernetesPodOperator(
namespace="default",
@@ -263,7 +258,6 @@ class TestKubernetesPodOperatorSystem:
assert self.expected_pod["spec"] == actual_pod["spec"]
assert self.expected_pod["metadata"]["labels"] ==
actual_pod["metadata"]["labels"]
- @pytest.mark.asyncio
def test_skip_on_specified_exit_code(self, mock_get_connection):
k = KubernetesPodOperator(
namespace="default",
@@ -280,7 +274,6 @@ class TestKubernetesPodOperatorSystem:
with pytest.raises(AirflowSkipException):
k.execute(context)
- @pytest.mark.asyncio
def test_already_checked_on_success(self, mock_get_connection):
"""
When ``on_finish_action="keep_pod"``, pod should have 'already_checked'
@@ -303,7 +296,6 @@ class TestKubernetesPodOperatorSystem:
actual_pod = self.api_client.sanitize_for_serialization(actual_pod)
assert actual_pod["metadata"]["labels"]["already_checked"] == "True"
- @pytest.mark.asyncio
def test_already_checked_on_failure(self, mock_get_connection):
"""
When ``on_finish_action="keep_pod"``, pod should have 'already_checked'
@@ -329,7 +321,6 @@ class TestKubernetesPodOperatorSystem:
assert status["state"]["terminated"]["reason"] == "Error"
assert actual_pod["metadata"]["labels"]["already_checked"] == "True"
- @pytest.mark.asyncio
def test_pod_hostnetwork(self, mock_get_connection):
k = KubernetesPodOperator(
namespace="default",
@@ -349,7 +340,6 @@ class TestKubernetesPodOperatorSystem:
assert self.expected_pod["spec"] == actual_pod["spec"]
assert self.expected_pod["metadata"]["labels"] ==
actual_pod["metadata"]["labels"]
- @pytest.mark.asyncio
def test_pod_dnspolicy(self, mock_get_connection):
dns_policy = "ClusterFirstWithHostNet"
k = KubernetesPodOperator(
@@ -372,7 +362,6 @@ class TestKubernetesPodOperatorSystem:
assert self.expected_pod["spec"] == actual_pod["spec"]
assert self.expected_pod["metadata"]["labels"] ==
actual_pod["metadata"]["labels"]
- @pytest.mark.asyncio
def test_pod_schedulername(self, mock_get_connection):
scheduler_name = "default-scheduler"
k = KubernetesPodOperator(
@@ -392,7 +381,6 @@ class TestKubernetesPodOperatorSystem:
self.expected_pod["spec"]["schedulerName"] = scheduler_name
assert self.expected_pod == actual_pod
- @pytest.mark.asyncio
def test_pod_node_selector(self, mock_get_connection):
node_selector = {"beta.kubernetes.io/os": "linux"}
k = KubernetesPodOperator(
@@ -412,7 +400,6 @@ class TestKubernetesPodOperatorSystem:
self.expected_pod["spec"]["nodeSelector"] = node_selector
assert self.expected_pod == actual_pod
- @pytest.mark.asyncio
def test_pod_resources(self, mock_get_connection):
resources = k8s.V1ResourceRequirements(
requests={"memory": "64Mi", "cpu": "250m", "ephemeral-storage":
"1Gi"},
@@ -438,7 +425,6 @@ class TestKubernetesPodOperatorSystem:
}
assert self.expected_pod == actual_pod
- @pytest.mark.asyncio
@pytest.mark.parametrize(
"val",
[
@@ -515,7 +501,6 @@ class TestKubernetesPodOperatorSystem:
self.expected_pod["spec"]["affinity"] = expected
assert self.expected_pod == actual_pod
- @pytest.mark.asyncio
def test_port(self, mock_get_connection):
port = k8s.V1ContainerPort(
name="http",
@@ -539,7 +524,6 @@ class TestKubernetesPodOperatorSystem:
self.expected_pod["spec"]["containers"][0]["ports"] = [{"name":
"http", "containerPort": 80}]
assert self.expected_pod == actual_pod
- @pytest.mark.asyncio
def test_volume_mount(self, mock_get_connection):
with mock.patch.object(PodManager, "log") as mock_logger:
volume_mount = k8s.V1VolumeMount(
@@ -579,7 +563,6 @@ class TestKubernetesPodOperatorSystem:
]
assert self.expected_pod == actual_pod
- @pytest.mark.asyncio
@pytest.mark.parametrize("uid", [0, 1000])
def test_run_as_user(self, uid, mock_get_connection):
security_context = V1PodSecurityContext(run_as_user=uid)
@@ -605,7 +588,6 @@ class TestKubernetesPodOperatorSystem:
)
assert pod.to_dict()["spec"]["security_context"]["run_as_user"] == uid
- @pytest.mark.asyncio
@pytest.mark.parametrize("gid", [0, 1000])
def test_fs_group(self, gid, mock_get_connection):
security_context = V1PodSecurityContext(fs_group=gid)
@@ -631,7 +613,6 @@ class TestKubernetesPodOperatorSystem:
)
assert pod.to_dict()["spec"]["security_context"]["fs_group"] == gid
- @pytest.mark.asyncio
def test_disable_privilege_escalation(self, mock_get_connection):
container_security_context =
V1SecurityContext(allow_privilege_escalation=False)
@@ -654,7 +635,6 @@ class TestKubernetesPodOperatorSystem:
}
assert self.expected_pod == actual_pod
- @pytest.mark.asyncio
def test_faulty_image(self, mock_get_connection):
bad_image_name = "foobar"
k = KubernetesPodOperator(
@@ -693,7 +673,6 @@ class TestKubernetesPodOperatorSystem:
with pytest.raises(ApiException, match="error looking up service
account default/foobar"):
k.get_or_create_pod(pod, context)
- @pytest.mark.asyncio
def test_pod_failure(self, mock_get_connection):
"""
Tests that the task fails when a pod reports a failure
@@ -716,7 +695,6 @@ class TestKubernetesPodOperatorSystem:
self.expected_pod["spec"]["containers"][0]["args"] =
bad_internal_command
assert self.expected_pod == actual_pod
- @pytest.mark.asyncio
def test_xcom_push(self, test_label, mock_get_connection):
expected = {"test_label": test_label, "buzz": 2}
args = [f"echo '{json.dumps(expected)}' > /airflow/xcom/return.json"]
@@ -765,7 +743,6 @@ class TestKubernetesPodOperatorSystem:
]
assert self.expected_pod == actual_pod
- @pytest.mark.asyncio
def test_pod_template_file_system(self, mock_get_connection,
basic_pod_template):
"""Note: this test requires that you have a namespace ``mem-example``
in your cluster."""
k = KubernetesPodOperator(
@@ -781,7 +758,6 @@ class TestKubernetesPodOperatorSystem:
assert result is not None
assert result == {"hello": "world"}
- @pytest.mark.asyncio
@pytest.mark.parametrize(
"env_vars",
[
@@ -817,7 +793,6 @@ class TestKubernetesPodOperatorSystem:
assert k.pod.spec.containers[0].env == [k8s.V1EnvVar(name="env_name",
value="value")]
assert result == {"hello": "world"}
- @pytest.mark.asyncio
def test_pod_template_file_with_full_pod_spec(self, test_label,
mock_get_connection, basic_pod_template):
pod_spec = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
@@ -858,7 +833,6 @@ class TestKubernetesPodOperatorSystem:
assert k.pod.spec.containers[0].env == [k8s.V1EnvVar(name="env_name",
value="value")]
assert result == {"hello": "world"}
- @pytest.mark.asyncio
def test_full_pod_spec(self, test_label, mock_get_connection):
pod_spec = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
@@ -904,7 +878,6 @@ class TestKubernetesPodOperatorSystem:
assert k.pod.spec.containers[0].env == [k8s.V1EnvVar(name="env_name",
value="value")]
assert result == {"hello": "world"}
- @pytest.mark.asyncio
def test_init_container(self, mock_get_connection):
# GIVEN
volume_mounts = [
@@ -959,10 +932,11 @@ class TestKubernetesPodOperatorSystem:
]
assert self.expected_pod == actual_pod
- @pytest.mark.asyncio
@mock.patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start")
@mock.patch(f"{POD_MANAGER_CLASS}.extract_xcom")
@mock.patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
+ @mock.patch(f"{POD_MANAGER_CLASS}.watch_pod_events", new=AsyncMock())
+ @mock.patch(f"{POD_MANAGER_CLASS}.await_pod_start", new=AsyncMock())
@mock.patch(f"{POD_MANAGER_CLASS}.create_pod", new=MagicMock)
@mock.patch(HOOK_CLASS)
def test_pod_template_file(
@@ -1067,8 +1041,9 @@ class TestKubernetesPodOperatorSystem:
del actual_pod["metadata"]["labels"]["airflow_version"]
assert expected_dict == actual_pod
- @pytest.mark.asyncio
@mock.patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
+ @mock.patch(f"{POD_MANAGER_CLASS}.watch_pod_events", new=AsyncMock())
+ @mock.patch(f"{POD_MANAGER_CLASS}.await_pod_start", new=AsyncMock())
@mock.patch(f"{POD_MANAGER_CLASS}.create_pod", new=MagicMock)
@mock.patch(HOOK_CLASS)
def test_pod_priority_class_name(self, hook_mock,
await_pod_completion_mock):
@@ -1102,7 +1077,6 @@ class TestKubernetesPodOperatorSystem:
self.expected_pod["spec"]["priorityClassName"] = priority_class_name
assert self.expected_pod == actual_pod
- @pytest.mark.asyncio
def test_pod_name(self, mock_get_connection):
pod_name_too_long = "a" * 221
k = KubernetesPodOperator(
@@ -1122,7 +1096,6 @@ class TestKubernetesPodOperatorSystem:
with pytest.raises(AirflowException):
k.execute(context)
- @pytest.mark.asyncio
def test_on_kill(self, mock_get_connection):
hook = KubernetesHook(conn_id=None, in_cluster=False)
client = hook.core_v1_client
@@ -1163,7 +1136,6 @@ class TestKubernetesPodOperatorSystem:
with pytest.raises(ApiException, match=r'pods \\"test.[a-z0-9]+\\" not
found'):
client.read_namespaced_pod(name=name, namespace=namespace)
- @pytest.mark.asyncio
def test_reattach_failing_pod_once(self, mock_get_connection):
hook = KubernetesHook(conn_id=None, in_cluster=False)
client = hook.core_v1_client
@@ -1224,11 +1196,6 @@ class TestKubernetesPodOperatorSystem:
# recreate op just to ensure we're not relying on any statefulness
k = get_op()
- # Before next attempt we need to re-create event loop if it is closed.
- loop = asyncio.get_event_loop()
- if loop.is_closed():
- asyncio.set_event_loop(asyncio.new_event_loop())
-
# `create_pod` should be called because though there's still a pod to
be found,
# it will be `already_checked`
with mock.patch(f"{POD_MANAGER_CLASS}.create_pod") as create_mock:
@@ -1236,7 +1203,6 @@ class TestKubernetesPodOperatorSystem:
k.execute(context)
create_mock.assert_called_once()
- @pytest.mark.asyncio
def test_changing_base_container_name_with_get_logs(self,
mock_get_connection):
k = KubernetesPodOperator(
namespace="default",
@@ -1262,7 +1228,6 @@ class TestKubernetesPodOperatorSystem:
self.expected_pod["spec"]["containers"][0]["name"] = "apple-sauce"
assert self.expected_pod["spec"] == actual_pod["spec"]
- @pytest.mark.asyncio
def test_changing_base_container_name_no_logs(self, mock_get_connection):
"""
This test checks BOTH a modified base container name AND the
get_logs=False flow,
@@ -1293,7 +1258,6 @@ class TestKubernetesPodOperatorSystem:
self.expected_pod["spec"]["containers"][0]["name"] = "apple-sauce"
assert self.expected_pod["spec"] == actual_pod["spec"]
- @pytest.mark.asyncio
def test_changing_base_container_name_no_logs_long(self,
mock_get_connection):
"""
Similar to test_changing_base_container_name_no_logs, but ensures that
@@ -1325,7 +1289,6 @@ class TestKubernetesPodOperatorSystem:
self.expected_pod["spec"]["containers"][0]["args"] = ["sleep 3"]
assert self.expected_pod["spec"] == actual_pod["spec"]
- @pytest.mark.asyncio
def test_changing_base_container_name_failure(self, mock_get_connection):
k = KubernetesPodOperator(
namespace="default",
@@ -1372,7 +1335,6 @@ class TestKubernetesPodOperatorSystem:
)
assert MyK8SPodOperator(task_id=str(uuid4())).base_container_name ==
"tomato-sauce"
- @pytest.mark.asyncio
def test_init_container_logs(self, mock_get_connection):
marker_from_init_container = f"{uuid4()}"
marker_from_main_container = f"{uuid4()}"
@@ -1404,7 +1366,6 @@ class TestKubernetesPodOperatorSystem:
assert marker_from_init_container in calls_args
assert marker_from_main_container in calls_args
- @pytest.mark.asyncio
def test_init_container_logs_filtered(self, mock_get_connection):
marker_from_init_container_to_log_1 = f"{uuid4()}"
marker_from_init_container_to_log_2 = f"{uuid4()}"
@@ -1502,7 +1463,6 @@ def
test_hide_sensitive_field_in_templated_fields_on_error(caplog, monkeypatch):
class TestKubernetesPodOperator(BaseK8STest):
- @pytest.mark.asyncio
@pytest.mark.parametrize(
"active_deadline_seconds,should_fail",
[(3, True), (60, False)],
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index a2c244c9ea2..420ebddb725 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -608,20 +608,18 @@ class KubernetesPodOperator(BaseOperator):
def await_pod_start(self, pod: k8s.V1Pod) -> None:
try:
- loop = asyncio.get_event_loop()
- events_task = asyncio.ensure_future(
- self.pod_manager.watch_pod_events(pod,
self.startup_check_interval_seconds)
- )
- loop.run_until_complete(
- self.pod_manager.await_pod_start(
+
+ async def _await_pod_start():
+ events_task = self.pod_manager.watch_pod_events(pod,
self.startup_check_interval_seconds)
+ pod_start_task = self.pod_manager.await_pod_start(
pod=pod,
schedule_timeout=self.schedule_timeout_seconds,
startup_timeout=self.startup_timeout_seconds,
check_interval=self.startup_check_interval_seconds,
)
- )
- loop.run_until_complete(events_task)
- loop.close()
+ await asyncio.gather(pod_start_task, events_task)
+
+ asyncio.run(_await_pod_start())
except PodLaunchFailedException:
if self.log_events_on_failure:
self._read_pod_events(pod, reraise=False)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 12a10e0dad3..ae30c5ca429 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -338,6 +338,7 @@ class PodManager(LoggingMixin):
self._client = kube_client
self._watch = watch.Watch()
self._callbacks = callbacks or []
+ self.stop_watching_events = False
def run_pod_async(self, pod: V1Pod, **kwargs) -> V1Pod:
"""Run POD asynchronously."""
@@ -380,9 +381,8 @@ class PodManager(LoggingMixin):
async def watch_pod_events(self, pod: V1Pod, check_interval: int = 1) ->
None:
"""Read pod events and writes into log."""
- self.keep_watching_for_events = True
num_events = 0
- while self.keep_watching_for_events:
+ while not self.stop_watching_events:
events = self.read_pod_events(pod)
for new_event in events.items[num_events:]:
involved_object: V1ObjectReference = new_event.involved_object
@@ -413,7 +413,7 @@ class PodManager(LoggingMixin):
remote_pod = self.read_pod(pod)
pod_status = remote_pod.status
if pod_status.phase != PodPhase.PENDING:
- self.keep_watching_for_events = False
+ self.stop_watching_events = True
self.log.info("::endgroup::")
break
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py
index 94a71961acc..40285db98f5 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes_commons.py
@@ -16,7 +16,6 @@
# under the License.
from __future__ import annotations
-import asyncio
from collections.abc import Callable
from unittest import mock
@@ -91,18 +90,8 @@ class TestKubernetesDecoratorsBase:
self.dag = dag
self.mock_create_pod =
mock.patch(f"{POD_MANAGER_CLASS}.create_pod").start()
- self.mock_watch_pod_events_patch = mock.patch(
- f"{POD_MANAGER_CLASS}.watch_pod_events",
new_callable=mock.AsyncMock
- )
- self.mock_watch_pod_events = self.mock_watch_pod_events_patch.start()
- self.mock_watch_pod_events.return_value = asyncio.Future()
- self.mock_watch_pod_events.return_value.set_result(None)
- self.mock_await_pod_start_patch = mock.patch(
- f"{POD_MANAGER_CLASS}.await_pod_start", new_callable=mock.AsyncMock
- )
- self.mock_await_pod_start = self.mock_await_pod_start_patch.start()
- self.mock_await_pod_start.return_value = asyncio.Future()
- self.mock_await_pod_start.return_value.set_result(None)
+ self.mock_await_pod_start =
mock.patch(f"{POD_MANAGER_CLASS}.await_pod_start").start()
+ self.mock_watch_pod_events =
mock.patch(f"{POD_MANAGER_CLASS}.watch_pod_events").start()
self.mock_await_xcom_sidecar_container_start = mock.patch(
f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start"
).start()
@@ -120,8 +109,10 @@ class TestKubernetesDecoratorsBase:
self.mock_fetch_logs =
mock.patch(f"{POD_MANAGER_CLASS}.fetch_requested_container_logs").start()
self.mock_fetch_logs.return_value = "logs"
- yield
-
+ try:
+ yield
+ except Exception:
+ pass
mock.patch.stopall()
def teardown_method(self):
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index 753bf712662..634a29028ea 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -16,7 +16,6 @@
# under the License.
from __future__ import annotations
-import asyncio
import datetime
import re
from contextlib import contextmanager, nullcontext
@@ -155,17 +154,13 @@ class TestKubernetesPodOperator:
@pytest.fixture(autouse=True)
def setup_tests(self, dag_maker):
self.create_pod_patch = patch(f"{POD_MANAGER_CLASS}.create_pod")
- self.watch_pod_events = patch(f"{POD_MANAGER_CLASS}.watch_pod_events",
new_callable=mock.AsyncMock)
- self.await_pod_patch = patch(f"{POD_MANAGER_CLASS}.await_pod_start",
new_callable=mock.AsyncMock)
+ self.watch_pod_events = patch(f"{POD_MANAGER_CLASS}.watch_pod_events")
+ self.await_pod_patch = patch(f"{POD_MANAGER_CLASS}.await_pod_start")
self.await_pod_completion_patch =
patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
self._default_client_patch = patch(f"{HOOK_CLASS}._get_default_client")
self.watch_pod_events_mock = self.watch_pod_events.start()
- self.watch_pod_events_mock.return_value = asyncio.Future()
- self.watch_pod_events_mock.return_value.set_result(None)
self.create_mock = self.create_pod_patch.start()
self.await_start_mock = self.await_pod_patch.start()
- self.await_start_mock.return_value = asyncio.Future()
- self.await_start_mock.return_value.set_result(None)
self.await_pod_mock = self.await_pod_completion_patch.start()
self._default_client_mock = self._default_client_patch.start()
self.dag_maker = dag_maker
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index e873e8bbb5b..194d1e3e96a 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -173,7 +173,7 @@ class TestPodManager:
startup_check_interval = 10
def mock_read_pod_events(pod):
- self.pod_manager.keep_watching_for_events = False
+ self.pod_manager.stop_watching_events = True
return events
with (
@@ -514,7 +514,7 @@ class TestPodManager:
assert mock_time_sleep.call_count == 3
assert f"::group::Waiting until {schedule_timeout}s to get the POD
scheduled..." in caplog.text
assert f"Waiting {startup_timeout}s to get the POD running..." in
caplog.text
- assert not self.pod_manager.keep_watching_for_events
+ assert self.pod_manager.stop_watching_events is True
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
def test_container_is_running(self, container_is_running_mock):
diff --git a/pyproject.toml b/pyproject.toml
index c6e02a26946..02768457878 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -619,7 +619,6 @@ extend-select = [
"RET506", # Unnecessary {branch} after raise statement
"RET507", # Unnecessary {branch} after continue statement
"RET508", # Unnecessary {branch} after break statement
- "SIM105", # Use contextlib.suppress({exception}) instead of try-except-pass
]
ignore = [
"D100", # Unwanted; Docstring at the top of every file.
@@ -655,6 +654,7 @@ ignore = [
"E501", # Formatted code may exceed the line length, leading to
line-too-long (E501) errors.
"ASYNC110", # TODO: Use `anyio.Event` instead of awaiting `anyio.sleep` in
a `while` loop
"UP038",
+ "SIM105", # Use contextlib.suppress({exception}) instead of try-except-pass
]
unfixable = [
# PT022 replace empty `yield` to empty `return`. Might be fixed with a
combination of PLR1711