This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cd296d2068 KubernetesExecutor observability Improvements (#35579)
cd296d2068 is described below

commit cd296d2068b005ebeb5cdc4509e670901bf5b9f3
Author: dirrao <39794726+dir...@users.noreply.github.com>
AuthorDate: Sun Nov 12 23:11:07 2023 +0530

    KubernetesExecutor observability Improvements (#35579)
    
    
    
    ---------
    
    Co-authored-by: gopal <gopal_diris...@apple.com>
---
 .../kubernetes/executors/kubernetes_executor.py    | 215 +++++++++++----------
 .../logging-monitoring/metrics.rst                 |  40 ++--
 2 files changed, 130 insertions(+), 125 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py 
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index cb9d77c1c8..dd0993e3d4 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -37,6 +37,7 @@ from typing import TYPE_CHECKING, Any, Sequence
 from sqlalchemy import select, update
 
 from airflow.providers.cncf.kubernetes.pod_generator import 
PodMutationHookException, PodReconciliationError
+from airflow.stats import Stats
 
 try:
     from airflow.cli.cli_config import (
@@ -209,89 +210,90 @@ class KubernetesExecutor(BaseExecutor):
             assert self.kube_client
         from airflow.models.taskinstance import TaskInstance
 
-        self.log.debug("Clearing tasks that have not been launched")
-        query = select(TaskInstance).where(
-            TaskInstance.state == TaskInstanceState.QUEUED, 
TaskInstance.queued_by_job_id == self.job_id
-        )
-        if self.kubernetes_queue:
-            query = query.where(TaskInstance.queue == self.kubernetes_queue)
-        queued_tis: list[TaskInstance] = session.scalars(query).all()
-        self.log.info("Found %s queued task instances", len(queued_tis))
-
-        # Go through the "last seen" dictionary and clean out old entries
-        allowed_age = self.kube_config.worker_pods_queued_check_interval * 3
-        for key, timestamp in list(self.last_handled.items()):
-            if time.time() - timestamp > allowed_age:
-                del self.last_handled[key]
-
-        if not queued_tis:
-            return
-
-        # airflow worker label selector batch call
-        kwargs = {"label_selector": 
f"airflow-worker={self._make_safe_label_value(str(self.job_id))}"}
-        if self.kube_config.kube_client_request_args:
-            kwargs.update(self.kube_config.kube_client_request_args)
-        pod_list = self._list_pods(kwargs)
-
-        # create a set against pod query label fields
-        label_search_set = set()
-        for pod in pod_list:
-            dag_id = pod.metadata.labels.get("dag_id", None)
-            task_id = pod.metadata.labels.get("task_id", None)
-            airflow_worker = pod.metadata.labels.get("airflow-worker", None)
-            map_index = pod.metadata.labels.get("map_index", None)
-            run_id = pod.metadata.labels.get("run_id", None)
-            execution_date = pod.metadata.labels.get("execution_date", None)
-            if dag_id is None or task_id is None or airflow_worker is None:
-                continue
-            label_search_base_str = 
f"dag_id={dag_id},task_id={task_id},airflow-worker={airflow_worker}"
-            if map_index is not None:
-                label_search_base_str += f",map_index={map_index}"
-            if run_id is not None:
-                label_search_str = f"{label_search_base_str},run_id={run_id}"
-                label_search_set.add(label_search_str)
-            if execution_date is not None:
-                label_search_str = 
f"{label_search_base_str},execution_date={execution_date}"
-                label_search_set.add(label_search_str)
-
-        for ti in queued_tis:
-            self.log.debug("Checking task instance %s", ti)
-
-            # Check to see if we've handled it ourselves recently
-            if ti.key in self.last_handled:
-                continue
-
-            # Build the pod selector
-            base_label_selector = (
-                f"dag_id={self._make_safe_label_value(ti.dag_id)},"
-                f"task_id={self._make_safe_label_value(ti.task_id)},"
-                
f"airflow-worker={self._make_safe_label_value(str(ti.queued_by_job_id))}"
-            )
-            if ti.map_index >= 0:
-                # Old tasks _couldn't_ be mapped, so we don't have to worry 
about compat
-                base_label_selector += f",map_index={ti.map_index}"
-
-            # Try run_id first
-            label_search_str = 
f"{base_label_selector},run_id={self._make_safe_label_value(ti.run_id)}"
-            if label_search_str in label_search_set:
-                continue
-            # Fallback to old style of using execution_date
-            label_search_str = (
-                
f"{base_label_selector},execution_date={self._make_safe_label_value(ti.execution_date)}"
+        with 
Stats.timer("kubernetes_executor.clear_not_launched_queued_tasks.duration"):
+            self.log.debug("Clearing tasks that have not been launched")
+            query = select(TaskInstance).where(
+                TaskInstance.state == TaskInstanceState.QUEUED, 
TaskInstance.queued_by_job_id == self.job_id
             )
-            if label_search_str in label_search_set:
-                continue
-            self.log.info("TaskInstance: %s found in queued state but was not 
launched, rescheduling", ti)
-            session.execute(
-                update(TaskInstance)
-                .where(
-                    TaskInstance.dag_id == ti.dag_id,
-                    TaskInstance.task_id == ti.task_id,
-                    TaskInstance.run_id == ti.run_id,
-                    TaskInstance.map_index == ti.map_index,
+            if self.kubernetes_queue:
+                query = query.where(TaskInstance.queue == 
self.kubernetes_queue)
+            queued_tis: list[TaskInstance] = session.scalars(query).all()
+            self.log.info("Found %s queued task instances", len(queued_tis))
+
+            # Go through the "last seen" dictionary and clean out old entries
+            allowed_age = self.kube_config.worker_pods_queued_check_interval * 
3
+            for key, timestamp in list(self.last_handled.items()):
+                if time.time() - timestamp > allowed_age:
+                    del self.last_handled[key]
+
+            if not queued_tis:
+                return
+
+            # airflow worker label selector batch call
+            kwargs = {"label_selector": 
f"airflow-worker={self._make_safe_label_value(str(self.job_id))}"}
+            if self.kube_config.kube_client_request_args:
+                kwargs.update(self.kube_config.kube_client_request_args)
+            pod_list = self._list_pods(kwargs)
+
+            # create a set against pod query label fields
+            label_search_set = set()
+            for pod in pod_list:
+                dag_id = pod.metadata.labels.get("dag_id", None)
+                task_id = pod.metadata.labels.get("task_id", None)
+                airflow_worker = pod.metadata.labels.get("airflow-worker", 
None)
+                map_index = pod.metadata.labels.get("map_index", None)
+                run_id = pod.metadata.labels.get("run_id", None)
+                execution_date = pod.metadata.labels.get("execution_date", 
None)
+                if dag_id is None or task_id is None or airflow_worker is None:
+                    continue
+                label_search_base_str = 
f"dag_id={dag_id},task_id={task_id},airflow-worker={airflow_worker}"
+                if map_index is not None:
+                    label_search_base_str += f",map_index={map_index}"
+                if run_id is not None:
+                    label_search_str = 
f"{label_search_base_str},run_id={run_id}"
+                    label_search_set.add(label_search_str)
+                if execution_date is not None:
+                    label_search_str = 
f"{label_search_base_str},execution_date={execution_date}"
+                    label_search_set.add(label_search_str)
+
+            for ti in queued_tis:
+                self.log.debug("Checking task instance %s", ti)
+
+                # Check to see if we've handled it ourselves recently
+                if ti.key in self.last_handled:
+                    continue
+
+                # Build the pod selector
+                base_label_selector = (
+                    f"dag_id={self._make_safe_label_value(ti.dag_id)},"
+                    f"task_id={self._make_safe_label_value(ti.task_id)},"
+                    
f"airflow-worker={self._make_safe_label_value(str(ti.queued_by_job_id))}"
+                )
+                if ti.map_index >= 0:
+                    # Old tasks _couldn't_ be mapped, so we don't have to 
worry about compat
+                    base_label_selector += f",map_index={ti.map_index}"
+
+                # Try run_id first
+                label_search_str = 
f"{base_label_selector},run_id={self._make_safe_label_value(ti.run_id)}"
+                if label_search_str in label_search_set:
+                    continue
+                # Fallback to old style of using execution_date
+                label_search_str = (
+                    
f"{base_label_selector},execution_date={self._make_safe_label_value(ti.execution_date)}"
+                )
+                if label_search_str in label_search_set:
+                    continue
+                self.log.info("TaskInstance: %s found in queued state but was 
not launched, rescheduling", ti)
+                session.execute(
+                    update(TaskInstance)
+                    .where(
+                        TaskInstance.dag_id == ti.dag_id,
+                        TaskInstance.task_id == ti.task_id,
+                        TaskInstance.run_id == ti.run_id,
+                        TaskInstance.map_index == ti.map_index,
+                    )
+                    .values(state=TaskInstanceState.SCHEDULED)
                 )
-                .values(state=TaskInstanceState.SCHEDULED)
-            )
 
     def start(self) -> None:
         """Start the executor."""
@@ -534,31 +536,32 @@ class KubernetesExecutor(BaseExecutor):
         return messages, ["\n".join(log)]
 
     def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> 
Sequence[TaskInstance]:
-        # Always flush TIs without queued_by_job_id
-        tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
-        scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
-        tis_to_flush_by_key = {ti.key: ti for ti in tis if ti.queued_by_job_id}
-        kube_client: client.CoreV1Api = self.kube_client
-        for scheduler_job_id in scheduler_job_ids:
-            scheduler_job_id = 
self._make_safe_label_value(str(scheduler_job_id))
-            # We will look for any pods owned by the no-longer-running 
scheduler,
-            # but will exclude only successful pods, as those TIs will have a 
terminal state
-            # and not be up for adoption!
-            # Those workers that failed, however, are okay to adopt here as 
their TI will
-            # still be in queued.
-            query_kwargs = {
-                "field_selector": "status.phase!=Succeeded",
-                "label_selector": (
-                    "kubernetes_executor=True,"
-                    
f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True"
-                ),
-            }
-            pod_list = self._list_pods(query_kwargs)
-            for pod in pod_list:
-                self.adopt_launched_task(kube_client, pod, tis_to_flush_by_key)
-        self._adopt_completed_pods(kube_client)
-        tis_to_flush.extend(tis_to_flush_by_key.values())
-        return tis_to_flush
+        with Stats.timer("kubernetes_executor.adopt_task_instances.duration"):
+            # Always flush TIs without queued_by_job_id
+            tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
+            scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
+            tis_to_flush_by_key = {ti.key: ti for ti in tis if 
ti.queued_by_job_id}
+            kube_client: client.CoreV1Api = self.kube_client
+            for scheduler_job_id in scheduler_job_ids:
+                scheduler_job_id = 
self._make_safe_label_value(str(scheduler_job_id))
+                # We will look for any pods owned by the no-longer-running 
scheduler,
+                # but will exclude only successful pods, as those TIs will 
have a terminal state
+                # and not be up for adoption!
+                # Those workers that failed, however, are okay to adopt here 
as their TI will
+                # still be in queued.
+                query_kwargs = {
+                    "field_selector": "status.phase!=Succeeded",
+                    "label_selector": (
+                        "kubernetes_executor=True,"
+                        
f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True"
+                    ),
+                }
+                pod_list = self._list_pods(query_kwargs)
+                for pod in pod_list:
+                    self.adopt_launched_task(kube_client, pod, 
tis_to_flush_by_key)
+            self._adopt_completed_pods(kube_client)
+            tis_to_flush.extend(tis_to_flush_by_key.values())
+            return tis_to_flush
 
     def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
         """
diff --git 
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
 
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
index b6703c41a4..54efc7501e 100644
--- 
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
+++ 
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
@@ -223,22 +223,24 @@ Name                                                
Description
 Timers
 ------
 
-=================================================== 
========================================================================
-Name                                                Description
-=================================================== 
========================================================================
-``dagrun.dependency-check.<dag_id>``                Milliseconds taken to 
check DAG dependencies
-``dag.<dag_id>.<task_id>.duration``                 Seconds taken to run a task
-``dag.<dag_id>.<task_id>.scheduled_duration``       Seconds a task spends in 
the Scheduled state, before being Queued
-``dag.<dag_id>.<task_id>.queued_duration``          Seconds a task spends in 
the Queued state, before being Running
-``dag_processing.last_duration.<dag_file>``         Seconds taken to load the 
given DAG file
-``dagrun.duration.success.<dag_id>``                Seconds taken for a DagRun 
to reach success state
-``dagrun.duration.failed.<dag_id>``                 Seconds taken for a DagRun 
to reach failed state
-``dagrun.schedule_delay.<dag_id>``                  Seconds of delay between 
the scheduled DagRun
-                                                    start date and the actual 
DagRun start date
-``scheduler.critical_section_duration``             Milliseconds spent in the 
critical section of scheduler loop --
-                                                    only a single scheduler 
can enter this loop at a time
-``scheduler.critical_section_query_duration``       Milliseconds spent running 
the critical section task instance query
-``scheduler.scheduler_loop_duration``               Milliseconds spent running 
one scheduler loop
-``dagrun.<dag_id>.first_task_scheduling_delay``     Seconds elapsed between 
first task start_date and dagrun expected start
-``collect_db_dags``                                 Milliseconds taken for 
fetching all Serialized Dags from DB
-=================================================== 
========================================================================
+================================================================ 
========================================================================
+Name                                                             Description
+================================================================ 
========================================================================
+``dagrun.dependency-check.<dag_id>``                             Milliseconds 
taken to check DAG dependencies
+``dag.<dag_id>.<task_id>.duration``                              Seconds taken 
to run a task
+``dag.<dag_id>.<task_id>.scheduled_duration``                    Seconds a 
task spends in the Scheduled state, before being Queued
+``dag.<dag_id>.<task_id>.queued_duration``                       Seconds a 
task spends in the Queued state, before being Running
+``dag_processing.last_duration.<dag_file>``                      Seconds taken 
to load the given DAG file
+``dagrun.duration.success.<dag_id>``                             Seconds taken 
for a DagRun to reach success state
+``dagrun.duration.failed.<dag_id>``                              Seconds taken 
for a DagRun to reach failed state
+``dagrun.schedule_delay.<dag_id>``                               Seconds of 
delay between the scheduled DagRun
+                                                                 start date 
and the actual DagRun start date
+``scheduler.critical_section_duration``                          Milliseconds 
spent in the critical section of scheduler loop --
+                                                                 only a single 
scheduler can enter this loop at a time
+``scheduler.critical_section_query_duration``                    Milliseconds 
spent running the critical section task instance query
+``scheduler.scheduler_loop_duration``                            Milliseconds 
spent running one scheduler loop
+``dagrun.<dag_id>.first_task_scheduling_delay``                  Seconds 
elapsed between first task start_date and dagrun expected start
+``collect_db_dags``                                              Milliseconds 
taken for fetching all Serialized Dags from DB
+``kubernetes_executor.clear_not_launched_queued_tasks.duration`` Milliseconds 
taken for clearing not launched queued tasks in Kubernetes Executor
+``kubernetes_executor.adopt_task_instances.duration``            Milliseconds 
taken to adopt the task instances in Kubernetes Executor
+================================================================ 
========================================================================

Reply via email to