This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e991f60a79 Add `active_deadline_seconds` parameter to 
`KubernetesPodOperator` (#33379)
e991f60a79 is described below

commit e991f60a797643d151471bf8e5ed98857e1274ac
Author: Amogh Desai <[email protected]>
AuthorDate: Sat Aug 19 01:20:00 2023 +0530

    Add `active_deadline_seconds` parameter to `KubernetesPodOperator` (#33379)
    
    * Inserting active_deadline_seconds in KPO
    
    * Fixing tests
    
    * Fix active_deadline_seconds test
    
    * Fixing tests
    
    * parametrize task_id to create a pod per task
    
    ---------
    
    Co-authored-by: Hussein Awala <[email protected]>
---
 airflow/providers/cncf/kubernetes/operators/pod.py |  5 ++++
 kubernetes_tests/test_kubernetes_pod_operator.py   | 31 ++++++++++++++++++++++
 2 files changed, 36 insertions(+)

diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index a3958b6237..7a1cd975e6 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -240,6 +240,8 @@ class KubernetesPodOperator(BaseOperator):
         Deprecated - use `on_finish_action` instead.
     :param termination_message_policy: The termination message policy of the 
base container.
         Default value is "File"
+    :param active_deadline_seconds: The active_deadline_seconds which matches 
to active_deadline_seconds
+        in V1PodSpec.
     """
 
     # This field can be overloaded at the instance level via 
base_container_name
@@ -320,6 +322,7 @@ class KubernetesPodOperator(BaseOperator):
         on_finish_action: str = "delete_pod",
         is_delete_operator_pod: None | bool = None,
         termination_message_policy: str = "File",
+        active_deadline_seconds: int | None = None,
         **kwargs,
     ) -> None:
         # TODO: remove in provider 6.0.0 release. This is a mitigate step to 
advise users to switch to the
@@ -417,6 +420,7 @@ class KubernetesPodOperator(BaseOperator):
             self.on_finish_action = OnFinishAction(on_finish_action)
             self.is_delete_operator_pod = self.on_finish_action == 
OnFinishAction.DELETE_POD
         self.termination_message_policy = termination_message_policy
+        self.active_deadline_seconds = active_deadline_seconds
 
         self._config_dict: dict | None = None  # TODO: remove it when removing 
convert_config_file_to_dict
 
@@ -860,6 +864,7 @@ class KubernetesPodOperator(BaseOperator):
                 restart_policy="Never",
                 priority_class_name=self.priority_class_name,
                 volumes=self.volumes,
+                active_deadline_seconds=self.active_deadline_seconds,
             ),
         )
 
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py 
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 3d7a2fd4ef..a1182581f9 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -29,6 +29,7 @@ from uuid import uuid4
 
 import pendulum
 import pytest
+from kubernetes import client
 from kubernetes.client import V1EnvVar, V1PodSecurityContext, 
V1SecurityContext, models as k8s
 from kubernetes.client.api_client import ApiClient
 from kubernetes.client.rest import ApiException
@@ -43,6 +44,7 @@ from airflow.utils import timezone
 from airflow.utils.context import Context
 from airflow.utils.types import DagRunType
 from airflow.version import version as airflow_version
+from kubernetes_tests.test_base import BaseK8STest
 
 HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook"
 POD_MANAGER_CLASS = 
"airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager"
@@ -1331,3 +1333,32 @@ def 
test_hide_sensitive_field_in_templated_fields_on_error(caplog, monkeypatch):
         task.render_template_fields(context=context)
     assert "password" in caplog.text
     assert "secretpassword" not in caplog.text
+
+
+class TestKubernetesPodOperator(BaseK8STest):
+    @pytest.mark.parametrize("active_deadline_seconds", [10, 20])
+    def test_kubernetes_pod_operator_active_deadline_seconds(self, 
active_deadline_seconds):
+        k = KubernetesPodOperator(
+            task_id=f"test_task_{active_deadline_seconds}",
+            active_deadline_seconds=active_deadline_seconds,
+            image="busybox",
+            cmds=["sh", "-c", "echo 'hello world' && sleep 60"],
+            namespace="default",
+            on_finish_action="keep_pod",
+        )
+
+        context = create_context(k)
+
+        with pytest.raises(AirflowException):
+            k.execute(context)
+
+        pod = k.find_pod("default", context, exclude_checked=False)
+
+        k8s_client = client.CoreV1Api()
+
+        pod_status = 
k8s_client.read_namespaced_pod_status(name=pod.metadata.name, 
namespace="default")
+        phase = pod_status.status.phase
+        reason = pod_status.status.reason
+
+        assert phase == "Failed"
+        assert reason == "DeadlineExceeded"

Reply via email to