This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e991f60a79 Add `active_deadline_seconds` parameter to
`KubernetesPodOperator` (#33379)
e991f60a79 is described below
commit e991f60a797643d151471bf8e5ed98857e1274ac
Author: Amogh Desai <[email protected]>
AuthorDate: Sat Aug 19 01:20:00 2023 +0530
Add `active_deadline_seconds` parameter to `KubernetesPodOperator` (#33379)
* Inserting active_deadline_seconds in KPO
* Fixing tests
* Fix active_deadline_seconds test
* Fixing tests
* parametrize task_id to create a pod per task
---------
Co-authored-by: Hussein Awala <[email protected]>
---
airflow/providers/cncf/kubernetes/operators/pod.py | 5 ++++
kubernetes_tests/test_kubernetes_pod_operator.py | 31 ++++++++++++++++++++++
2 files changed, 36 insertions(+)
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index a3958b6237..7a1cd975e6 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -240,6 +240,8 @@ class KubernetesPodOperator(BaseOperator):
Deprecated - use `on_finish_action` instead.
:param termination_message_policy: The termination message policy of the
base container.
Default value is "File"
+ :param active_deadline_seconds: The active_deadline_seconds which matches
to active_deadline_seconds
+ in V1PodSpec.
"""
# This field can be overloaded at the instance level via
base_container_name
@@ -320,6 +322,7 @@ class KubernetesPodOperator(BaseOperator):
on_finish_action: str = "delete_pod",
is_delete_operator_pod: None | bool = None,
termination_message_policy: str = "File",
+ active_deadline_seconds: int | None = None,
**kwargs,
) -> None:
# TODO: remove in provider 6.0.0 release. This is a mitigate step to
advise users to switch to the
@@ -417,6 +420,7 @@ class KubernetesPodOperator(BaseOperator):
self.on_finish_action = OnFinishAction(on_finish_action)
self.is_delete_operator_pod = self.on_finish_action ==
OnFinishAction.DELETE_POD
self.termination_message_policy = termination_message_policy
+ self.active_deadline_seconds = active_deadline_seconds
self._config_dict: dict | None = None # TODO: remove it when removing
convert_config_file_to_dict
@@ -860,6 +864,7 @@ class KubernetesPodOperator(BaseOperator):
restart_policy="Never",
priority_class_name=self.priority_class_name,
volumes=self.volumes,
+ active_deadline_seconds=self.active_deadline_seconds,
),
)
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 3d7a2fd4ef..a1182581f9 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -29,6 +29,7 @@ from uuid import uuid4
import pendulum
import pytest
+from kubernetes import client
from kubernetes.client import V1EnvVar, V1PodSecurityContext,
V1SecurityContext, models as k8s
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException
@@ -43,6 +44,7 @@ from airflow.utils import timezone
from airflow.utils.context import Context
from airflow.utils.types import DagRunType
from airflow.version import version as airflow_version
+from kubernetes_tests.test_base import BaseK8STest
HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook"
POD_MANAGER_CLASS =
"airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager"
@@ -1331,3 +1333,32 @@ def
test_hide_sensitive_field_in_templated_fields_on_error(caplog, monkeypatch):
task.render_template_fields(context=context)
assert "password" in caplog.text
assert "secretpassword" not in caplog.text
+
+
+class TestKubernetesPodOperator(BaseK8STest):
+ @pytest.mark.parametrize("active_deadline_seconds", [10, 20])
+ def test_kubernetes_pod_operator_active_deadline_seconds(self,
active_deadline_seconds):
+ k = KubernetesPodOperator(
+ task_id=f"test_task_{active_deadline_seconds}",
+ active_deadline_seconds=active_deadline_seconds,
+ image="busybox",
+ cmds=["sh", "-c", "echo 'hello world' && sleep 60"],
+ namespace="default",
+ on_finish_action="keep_pod",
+ )
+
+ context = create_context(k)
+
+ with pytest.raises(AirflowException):
+ k.execute(context)
+
+ pod = k.find_pod("default", context, exclude_checked=False)
+
+ k8s_client = client.CoreV1Api()
+
+ pod_status =
k8s_client.read_namespaced_pod_status(name=pod.metadata.name,
namespace="default")
+ phase = pod_status.status.phase
+ reason = pod_status.status.reason
+
+ assert phase == "Failed"
+ assert reason == "DeadlineExceeded"