This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new cd296d2068 KubernetesExecutor observability Improvements (#35579) cd296d2068 is described below commit cd296d2068b005ebeb5cdc4509e670901bf5b9f3 Author: dirrao <39794726+dir...@users.noreply.github.com> AuthorDate: Sun Nov 12 23:11:07 2023 +0530 KubernetesExecutor observability Improvements (#35579) --------- Co-authored-by: gopal <gopal_diris...@apple.com> --- .../kubernetes/executors/kubernetes_executor.py | 215 +++++++++++---------- .../logging-monitoring/metrics.rst | 40 ++-- 2 files changed, 130 insertions(+), 125 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index cb9d77c1c8..dd0993e3d4 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -37,6 +37,7 @@ from typing import TYPE_CHECKING, Any, Sequence from sqlalchemy import select, update from airflow.providers.cncf.kubernetes.pod_generator import PodMutationHookException, PodReconciliationError +from airflow.stats import Stats try: from airflow.cli.cli_config import ( @@ -209,89 +210,90 @@ class KubernetesExecutor(BaseExecutor): assert self.kube_client from airflow.models.taskinstance import TaskInstance - self.log.debug("Clearing tasks that have not been launched") - query = select(TaskInstance).where( - TaskInstance.state == TaskInstanceState.QUEUED, TaskInstance.queued_by_job_id == self.job_id - ) - if self.kubernetes_queue: - query = query.where(TaskInstance.queue == self.kubernetes_queue) - queued_tis: list[TaskInstance] = session.scalars(query).all() - self.log.info("Found %s queued task instances", len(queued_tis)) - - # Go through the "last seen" dictionary and clean out old entries - allowed_age = self.kube_config.worker_pods_queued_check_interval * 3 - for key, timestamp in list(self.last_handled.items()): - if time.time() - timestamp > allowed_age: - del self.last_handled[key] - - if not queued_tis: - return - - # airflow worker label selector batch call - kwargs = {"label_selector": f"airflow-worker={self._make_safe_label_value(str(self.job_id))}"} - if self.kube_config.kube_client_request_args: - kwargs.update(self.kube_config.kube_client_request_args) - pod_list = self._list_pods(kwargs) - - # create a set against pod query label fields - label_search_set = set() - for pod in pod_list: - dag_id = pod.metadata.labels.get("dag_id", None) - task_id = pod.metadata.labels.get("task_id", None) - airflow_worker = pod.metadata.labels.get("airflow-worker", None) - map_index = pod.metadata.labels.get("map_index", None) - run_id = pod.metadata.labels.get("run_id", None) - execution_date = pod.metadata.labels.get("execution_date", None) - if dag_id is None or task_id is None or airflow_worker is None: - continue - label_search_base_str = f"dag_id={dag_id},task_id={task_id},airflow-worker={airflow_worker}" - if map_index is not None: - label_search_base_str += f",map_index={map_index}" - if run_id is not None: - label_search_str = f"{label_search_base_str},run_id={run_id}" - label_search_set.add(label_search_str) - if execution_date is not None: - label_search_str = f"{label_search_base_str},execution_date={execution_date}" - label_search_set.add(label_search_str) - - for ti in queued_tis: - self.log.debug("Checking task instance %s", ti) - - # Check to see if we've handled it ourselves recently - if ti.key in self.last_handled: - continue - - # Build the pod selector - base_label_selector = ( - f"dag_id={self._make_safe_label_value(ti.dag_id)}," - f"task_id={self._make_safe_label_value(ti.task_id)}," - f"airflow-worker={self._make_safe_label_value(str(ti.queued_by_job_id))}" - ) - if ti.map_index >= 0: - # Old tasks _couldn't_ be mapped, so we don't have to worry about compat - base_label_selector += f",map_index={ti.map_index}" - - # Try run_id first - label_search_str = f"{base_label_selector},run_id={self._make_safe_label_value(ti.run_id)}" - if label_search_str in label_search_set: - continue - # Fallback to old style of using execution_date - label_search_str = ( - f"{base_label_selector},execution_date={self._make_safe_label_value(ti.execution_date)}" + with Stats.timer("kubernetes_executor.clear_not_launched_queued_tasks.duration"): + self.log.debug("Clearing tasks that have not been launched") + query = select(TaskInstance).where( + TaskInstance.state == TaskInstanceState.QUEUED, TaskInstance.queued_by_job_id == self.job_id ) - if label_search_str in label_search_set: - continue - self.log.info("TaskInstance: %s found in queued state but was not launched, rescheduling", ti) - session.execute( - update(TaskInstance) - .where( - TaskInstance.dag_id == ti.dag_id, - TaskInstance.task_id == ti.task_id, - TaskInstance.run_id == ti.run_id, - TaskInstance.map_index == ti.map_index, + if self.kubernetes_queue: + query = query.where(TaskInstance.queue == self.kubernetes_queue) + queued_tis: list[TaskInstance] = session.scalars(query).all() + self.log.info("Found %s queued task instances", len(queued_tis)) + + # Go through the "last seen" dictionary and clean out old entries + allowed_age = self.kube_config.worker_pods_queued_check_interval * 3 + for key, timestamp in list(self.last_handled.items()): + if time.time() - timestamp > allowed_age: + del self.last_handled[key] + + if not queued_tis: + return + + # airflow worker label selector batch call + kwargs = {"label_selector": f"airflow-worker={self._make_safe_label_value(str(self.job_id))}"} + if self.kube_config.kube_client_request_args: + kwargs.update(self.kube_config.kube_client_request_args) + pod_list = self._list_pods(kwargs) + + # create a set against pod query label fields + label_search_set = set() + for pod in pod_list: + dag_id = pod.metadata.labels.get("dag_id", None) + task_id = pod.metadata.labels.get("task_id", None) + airflow_worker = pod.metadata.labels.get("airflow-worker", None) + map_index = pod.metadata.labels.get("map_index", None) + run_id = pod.metadata.labels.get("run_id", None) + execution_date = pod.metadata.labels.get("execution_date", None) + if dag_id is None or task_id is None or airflow_worker is None: + continue + label_search_base_str = f"dag_id={dag_id},task_id={task_id},airflow-worker={airflow_worker}" + if map_index is not None: + label_search_base_str += f",map_index={map_index}" + if run_id is not None: + label_search_str = f"{label_search_base_str},run_id={run_id}" + label_search_set.add(label_search_str) + if execution_date is not None: + label_search_str = f"{label_search_base_str},execution_date={execution_date}" + label_search_set.add(label_search_str) + + for ti in queued_tis: + self.log.debug("Checking task instance %s", ti) + + # Check to see if we've handled it ourselves recently + if ti.key in self.last_handled: + continue + + # Build the pod selector + base_label_selector = ( + f"dag_id={self._make_safe_label_value(ti.dag_id)}," + f"task_id={self._make_safe_label_value(ti.task_id)}," + f"airflow-worker={self._make_safe_label_value(str(ti.queued_by_job_id))}" + ) + if ti.map_index >= 0: + # Old tasks _couldn't_ be mapped, so we don't have to worry about compat + base_label_selector += f",map_index={ti.map_index}" + + # Try run_id first + label_search_str = f"{base_label_selector},run_id={self._make_safe_label_value(ti.run_id)}" + if label_search_str in label_search_set: + continue + # Fallback to old style of using execution_date + label_search_str = ( + f"{base_label_selector},execution_date={self._make_safe_label_value(ti.execution_date)}" + ) + if label_search_str in label_search_set: + continue + self.log.info("TaskInstance: %s found in queued state but was not launched, rescheduling", ti) + session.execute( + update(TaskInstance) + .where( + TaskInstance.dag_id == ti.dag_id, + TaskInstance.task_id == ti.task_id, + TaskInstance.run_id == ti.run_id, + TaskInstance.map_index == ti.map_index, + ) + .values(state=TaskInstanceState.SCHEDULED) ) - .values(state=TaskInstanceState.SCHEDULED) - ) def start(self) -> None: """Start the executor.""" @@ -534,31 +536,32 @@ class KubernetesExecutor(BaseExecutor): return messages, ["\n".join(log)] def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]: - # Always flush TIs without queued_by_job_id - tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id] - scheduler_job_ids = {ti.queued_by_job_id for ti in tis} - tis_to_flush_by_key = {ti.key: ti for ti in tis if ti.queued_by_job_id} - kube_client: client.CoreV1Api = self.kube_client - for scheduler_job_id in scheduler_job_ids: - scheduler_job_id = self._make_safe_label_value(str(scheduler_job_id)) - # We will look for any pods owned by the no-longer-running scheduler, - # but will exclude only successful pods, as those TIs will have a terminal state - # and not be up for adoption! - # Those workers that failed, however, are okay to adopt here as their TI will - # still be in queued. - query_kwargs = { - "field_selector": "status.phase!=Succeeded", - "label_selector": ( - "kubernetes_executor=True," - f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True" - ), - } - pod_list = self._list_pods(query_kwargs) - for pod in pod_list: - self.adopt_launched_task(kube_client, pod, tis_to_flush_by_key) - self._adopt_completed_pods(kube_client) - tis_to_flush.extend(tis_to_flush_by_key.values()) - return tis_to_flush + with Stats.timer("kubernetes_executor.adopt_task_instances.duration"): + # Always flush TIs without queued_by_job_id + tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id] + scheduler_job_ids = {ti.queued_by_job_id for ti in tis} + tis_to_flush_by_key = {ti.key: ti for ti in tis if ti.queued_by_job_id} + kube_client: client.CoreV1Api = self.kube_client + for scheduler_job_id in scheduler_job_ids: + scheduler_job_id = self._make_safe_label_value(str(scheduler_job_id)) + # We will look for any pods owned by the no-longer-running scheduler, + # but will exclude only successful pods, as those TIs will have a terminal state + # and not be up for adoption! + # Those workers that failed, however, are okay to adopt here as their TI will + # still be in queued. + query_kwargs = { + "field_selector": "status.phase!=Succeeded", + "label_selector": ( + "kubernetes_executor=True," + f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True" + ), + } + pod_list = self._list_pods(query_kwargs) + for pod in pod_list: + self.adopt_launched_task(kube_client, pod, tis_to_flush_by_key) + self._adopt_completed_pods(kube_client) + tis_to_flush.extend(tis_to_flush_by_key.values()) + return tis_to_flush def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: """ diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst index b6703c41a4..54efc7501e 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst @@ -223,22 +223,24 @@ Name Description Timers ------ -=================================================== ======================================================================== -Name Description -=================================================== ======================================================================== -``dagrun.dependency-check.<dag_id>`` Milliseconds taken to check DAG dependencies -``dag.<dag_id>.<task_id>.duration`` Seconds taken to run a task -``dag.<dag_id>.<task_id>.scheduled_duration`` Seconds a task spends in the Scheduled state, before being Queued -``dag.<dag_id>.<task_id>.queued_duration`` Seconds a task spends in the Queued state, before being Running -``dag_processing.last_duration.<dag_file>`` Seconds taken to load the given DAG file -``dagrun.duration.success.<dag_id>`` Seconds taken for a DagRun to reach success state -``dagrun.duration.failed.<dag_id>`` Seconds taken for a DagRun to reach failed state -``dagrun.schedule_delay.<dag_id>`` Seconds of delay between the scheduled DagRun - start date and the actual DagRun start date -``scheduler.critical_section_duration`` Milliseconds spent in the critical section of scheduler loop -- - only a single scheduler can enter this loop at a time -``scheduler.critical_section_query_duration`` Milliseconds spent running the critical section task instance query -``scheduler.scheduler_loop_duration`` Milliseconds spent running one scheduler loop -``dagrun.<dag_id>.first_task_scheduling_delay`` Seconds elapsed between first task start_date and dagrun expected start -``collect_db_dags`` Milliseconds taken for fetching all Serialized Dags from DB -=================================================== ======================================================================== +================================================================ ======================================================================== +Name Description +================================================================ ======================================================================== +``dagrun.dependency-check.<dag_id>`` Milliseconds taken to check DAG dependencies +``dag.<dag_id>.<task_id>.duration`` Seconds taken to run a task +``dag.<dag_id>.<task_id>.scheduled_duration`` Seconds a task spends in the Scheduled state, before being Queued +``dag.<dag_id>.<task_id>.queued_duration`` Seconds a task spends in the Queued state, before being Running +``dag_processing.last_duration.<dag_file>`` Seconds taken to load the given DAG file +``dagrun.duration.success.<dag_id>`` Seconds taken for a DagRun to reach success state +``dagrun.duration.failed.<dag_id>`` Seconds taken for a DagRun to reach failed state +``dagrun.schedule_delay.<dag_id>`` Seconds of delay between the scheduled DagRun + start date and the actual DagRun start date +``scheduler.critical_section_duration`` Milliseconds spent in the critical section of scheduler loop -- + only a single scheduler can enter this loop at a time +``scheduler.critical_section_query_duration`` Milliseconds spent running the critical section task instance query +``scheduler.scheduler_loop_duration`` Milliseconds spent running one scheduler loop +``dagrun.<dag_id>.first_task_scheduling_delay`` Seconds elapsed between first task start_date and dagrun expected start +``collect_db_dags`` Milliseconds taken for fetching all Serialized Dags from DB +``kubernetes_executor.clear_not_launched_queued_tasks.duration`` Milliseconds taken for clearing not launched queued tasks in Kubernetes Executor +``kubernetes_executor.adopt_task_instances.duration`` Milliseconds taken to adopt the task instances in Kubernetes Executor +================================================================ ========================================================================