This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 0f4884c3ca [AIP-49] OpenTelemetry Traces for Apache Airflow Part 2 (#40802) 0f4884c3ca is described below commit 0f4884c3ca26c39bc6bd21967c950e718589fcd6 Author: Howard Yoo <32691630+howard...@users.noreply.github.com> AuthorDate: Sat Jul 20 02:11:40 2024 -0500 [AIP-49] OpenTelemetry Traces for Apache Airflow Part 2 (#40802) --------- Co-authored-by: D. Ferruzzi <ferru...@amazon.com> --- airflow/dag_processing/manager.py | 302 ++++++++++++++---------- airflow/executors/base_executor.py | 75 +++++- airflow/executors/local_executor.py | 17 ++ airflow/executors/sequential_executor.py | 10 + airflow/jobs/job.py | 104 +++++---- airflow/jobs/local_task_job_runner.py | 79 ++++--- airflow/jobs/scheduler_job_runner.py | 303 ++++++++++++++++++------- airflow/jobs/triggerer_job_runner.py | 45 +++- airflow/models/dagrun.py | 33 +++ airflow/models/taskinstance.py | 22 ++ airflow/traces/__init__.py | 1 + airflow/traces/tracer.py | 3 + airflow/traces/utils.py | 13 +- scripts/ci/docker-compose/integration-otel.yml | 2 +- 14 files changed, 715 insertions(+), 294 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index f358dd9ee1..57858fc427 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -53,7 +53,9 @@ from airflow.models.errors import ParseImportError from airflow.models.serialized_dag import SerializedDagModel from airflow.secrets.cache import SecretCache from airflow.stats import Stats +from airflow.traces.tracer import Trace, span from airflow.utils import timezone +from airflow.utils.dates import datetime_to_nano from airflow.utils.file import list_py_file_paths, might_contain_dag from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.mixins import MultiprocessingStartMethodMixin @@ -228,7 +230,9 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin): # to kill all sub-process of this at the OS-level, rather than having # to iterate the child processes set_new_process_group() - + span = Trace.get_current_span() + span.set_attribute("dag_directory", str(dag_directory)) + span.set_attribute("dag_ids", str(dag_ids)) setproctitle("airflow scheduler -- DagFileProcessorManager") reload_configuration_for_dag_processing() processor_manager = DagFileProcessorManager( @@ -258,8 +262,10 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin): self._heartbeat_manager() def _process_message(self, message): + span = Trace.get_current_span() self.log.debug("Received message of type %s", type(message).__name__) if isinstance(message, DagParsingStat): + span.set_attribute("all_files_processed", str(message.all_files_processed)) self._sync_metadata(message) else: raise RuntimeError(f"Unexpected message received of type {type(message).__name__}") @@ -562,118 +568,144 @@ class DagFileProcessorManager(LoggingMixin): # in sync mode we need to be told to start a "loop" self.start_new_processes() while True: - loop_start_time = time.monotonic() - ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time) - self.heartbeat() - if self._direct_scheduler_conn is not None and self._direct_scheduler_conn in ready: - agent_signal = self._direct_scheduler_conn.recv() - - self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal) - if agent_signal == DagParsingSignal.TERMINATE_MANAGER: - self.terminate() - break - elif agent_signal == DagParsingSignal.END_MANAGER: - self.end() - sys.exit(os.EX_OK) - elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE: - # continue the loop to parse dags - pass - elif isinstance(agent_signal, CallbackRequest): - self._add_callback_to_queue(agent_signal) - else: - raise ValueError(f"Invalid message {type(agent_signal)}") - - if not ready and not self._async_mode: - # In "sync" mode we don't want to parse the DAGs until we - # are told to (as that would open another connection to the - # SQLite DB which isn't a good practice - - # This shouldn't happen, as in sync mode poll should block for - # ever. Lets be defensive about that. - self.log.warning( - "wait() unexpectedly returned nothing ready after infinite timeout (%r)!", poll_time - ) - - continue - - for sentinel in ready: - if sentinel is not self._direct_scheduler_conn: - processor = self.waitables.get(sentinel) - if processor: - self._collect_results_from_processor(processor) - self.waitables.pop(sentinel) - self._processors.pop(processor.file_path) - - if self.standalone_dag_processor: - self._fetch_callbacks(max_callbacks_per_loop) - self._scan_stale_dags() - DagWarning.purge_inactive_dag_warnings() - refreshed_dag_dir = self._refresh_dag_dir() - - self._kill_timed_out_processors() - - # Generate more file paths to process if we processed all the files already. Note for this - # to clear down, we must have cleared all files found from scanning the dags dir _and_ have - # cleared all files added as a result of callbacks - if not self._file_path_queue: - self.emit_metrics() - self.prepare_file_path_queue() - - # if new files found in dag dir, add them - elif refreshed_dag_dir: - self.add_new_file_path_to_queue() - - self._refresh_requested_filelocs() - self.start_new_processes() - - # Update number of loop iteration. - self._num_run += 1 - - if not self._async_mode: - self.log.debug("Waiting for processors to finish since we're using sqlite") - # Wait until the running DAG processors are finished before - # sending a DagParsingStat message back. This means the Agent - # can tell we've got to the end of this iteration when it sees - # this type of message - self.wait_until_finished() - - # Collect anything else that has finished, but don't kick off any more processors - self.collect_results() + with Trace.start_span(span_name="dag_parsing_loop", component="DagFileProcessorManager") as span: + loop_start_time = time.monotonic() + ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time) + if span.is_recording(): + span.add_event(name="heartbeat") + self.heartbeat() + if self._direct_scheduler_conn is not None and self._direct_scheduler_conn in ready: + agent_signal = self._direct_scheduler_conn.recv() + + self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal) + if agent_signal == DagParsingSignal.TERMINATE_MANAGER: + if span.is_recording(): + span.add_event(name="terminate") + self.terminate() + break + elif agent_signal == DagParsingSignal.END_MANAGER: + if span.is_recording(): + span.add_event(name="end") + self.end() + sys.exit(os.EX_OK) + elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE: + # continue the loop to parse dags + pass + elif isinstance(agent_signal, CallbackRequest): + self._add_callback_to_queue(agent_signal) + else: + raise ValueError(f"Invalid message {type(agent_signal)}") + + if not ready and not self._async_mode: + # In "sync" mode we don't want to parse the DAGs until we + # are told to (as that would open another connection to the + # SQLite DB which isn't a good practice + + # This shouldn't happen, as in sync mode poll should block for + # ever. Lets be defensive about that. + self.log.warning( + "wait() unexpectedly returned nothing ready after infinite timeout (%r)!", poll_time + ) - self._print_stat() + continue - all_files_processed = all(self.get_last_finish_time(x) is not None for x in self.file_paths) - max_runs_reached = self.max_runs_reached() + for sentinel in ready: + if sentinel is not self._direct_scheduler_conn: + processor = self.waitables.get(sentinel) + if processor: + self._collect_results_from_processor(processor) + self.waitables.pop(sentinel) + self._processors.pop(processor.file_path) + + if self.standalone_dag_processor: + self._fetch_callbacks(max_callbacks_per_loop) + self._scan_stale_dags() + DagWarning.purge_inactive_dag_warnings() + refreshed_dag_dir = self._refresh_dag_dir() + + if span.is_recording(): + span.add_event(name="_kill_timed_out_processors") + self._kill_timed_out_processors() + + # Generate more file paths to process if we processed all the files already. Note for this + # to clear down, we must have cleared all files found from scanning the dags dir _and_ have + # cleared all files added as a result of callbacks + if not self._file_path_queue: + self.emit_metrics() + if span.is_recording(): + span.add_event(name="prepare_file_path_queue") + self.prepare_file_path_queue() + + # if new files found in dag dir, add them + elif refreshed_dag_dir: + if span.is_recording(): + span.add_event(name="add_new_file_path_to_queue") + self.add_new_file_path_to_queue() + + self._refresh_requested_filelocs() + if span.is_recording(): + span.add_event(name="start_new_processes") + self.start_new_processes() + + # Update number of loop iteration. + self._num_run += 1 + + if not self._async_mode: + self.log.debug("Waiting for processors to finish since we're using sqlite") + # Wait until the running DAG processors are finished before + # sending a DagParsingStat message back. This means the Agent + # can tell we've got to the end of this iteration when it sees + # this type of message + self.wait_until_finished() + + # Collect anything else that has finished, but don't kick off any more processors + if span.is_recording(): + span.add_event(name="collect_results") + self.collect_results() + + if span.is_recording(): + span.add_event(name="print_stat") + self._print_stat() + + all_files_processed = all(self.get_last_finish_time(x) is not None for x in self.file_paths) + max_runs_reached = self.max_runs_reached() - try: - if self._direct_scheduler_conn: - self._direct_scheduler_conn.send( - DagParsingStat( - max_runs_reached, - all_files_processed, + try: + if self._direct_scheduler_conn: + self._direct_scheduler_conn.send( + DagParsingStat( + max_runs_reached, + all_files_processed, + ) ) + except BlockingIOError: + # Try again next time around the loop! + + # It is better to fail, than it is deadlock. This should + # "almost never happen" since the DagParsingStat object is + # small, and in async mode this stat is not actually _required_ + # for normal operation (It only drives "max runs") + self.log.debug("BlockingIOError received trying to send DagParsingStat, ignoring") + + if max_runs_reached: + self.log.info( + "Exiting dag parsing loop as all files have been processed %s times", self._max_runs ) - except BlockingIOError: - # Try again next time around the loop! - - # It is better to fail, than it is deadlock. This should - # "almost never happen" since the DagParsingStat object is - # small, and in async mode this stat is not actually _required_ - # for normal operation (It only drives "max runs") - self.log.debug("BlockingIOError received trying to send DagParsingStat, ignoring") - - if max_runs_reached: - self.log.info( - "Exiting dag parsing loop as all files have been processed %s times", self._max_runs - ) - break + if span.is_recording(): + span.add_event( + name="info", + attributes={ + "message": "Exiting dag parsing loop as all files have been processed {self._max_runs} times" + }, + ) + break - if self._async_mode: - loop_duration = time.monotonic() - loop_start_time - if loop_duration < 1: - poll_time = 1 - loop_duration - else: - poll_time = 0.0 + if self._async_mode: + loop_duration = time.monotonic() - loop_start_time + if loop_duration < 1: + poll_time = 1 - loop_duration + else: + poll_time = 0.0 @provide_session def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION): @@ -1103,6 +1135,24 @@ class DagFileProcessorManager(LoggingMixin): ) self._file_stats[processor.file_path] = stat file_name = Path(processor.file_path).stem + """crude exposure of instrumentation code which may need to be furnished""" + span = Trace.get_tracer("DagFileProcessorManager").start_span( + "dag_processing", start_time=datetime_to_nano(processor.start_time) + ) + span.set_attribute("file_path", processor.file_path) + span.set_attribute("run_count", self.get_run_count(processor.file_path) + 1) + + if processor.result is None: + span.set_attribute("error", True) + span.set_attribute("processor.exit_code", processor.exit_code) + else: + span.set_attribute("num_dags", num_dags) + span.set_attribute("import_errors", count_import_errors) + if count_import_errors > 0: + span.set_attribute("error", True) + + span.end(end_time=datetime_to_nano(last_finish_time)) + Stats.timing(f"dag_processing.last_duration.{file_name}", last_duration) Stats.timing("dag_processing.last_duration", last_duration, tags={"file_name": file_name}) @@ -1134,6 +1184,7 @@ class DagFileProcessorManager(LoggingMixin): callback_requests=callback_requests, ) + @span def start_new_processes(self): """Start more processors if we have enough slots and files to process.""" # initialize cache to mutualize calls to Variable.get in DAGs @@ -1157,14 +1208,21 @@ class DagFileProcessorManager(LoggingMixin): del self._callback_to_execute[file_path] Stats.incr("dag_processing.processes", tags={"file_path": file_path, "action": "start"}) - + span = Trace.get_current_span() + span.set_attribute("category", "processing") processor.start() self.log.debug("Started a process (PID: %s) to generate tasks for %s", processor.pid, file_path) + if span.is_recording(): + span.add_event( + name="dag_processing processor started", + attributes={"file_path": file_path, "pid": processor.pid}, + ) self._processors[file_path] = processor self.waitables[processor.waitable_handle] = processor Stats.gauge("dag_processing.file_path_queue_size", len(self._file_path_queue)) + @span def add_new_file_path_to_queue(self): for file_path in self.file_paths: if file_path not in self._file_stats: @@ -1172,6 +1230,11 @@ class DagFileProcessorManager(LoggingMixin): self.log.info("Adding new file %s to parsing queue", file_path) self._file_stats[file_path] = DagFileProcessorManager.DEFAULT_FILE_STAT self._file_path_queue.appendleft(file_path) + span = Trace.get_current_span() + if span.is_recording(): + span.add_event( + name="adding new file to parsing queue", attributes={"file_path": file_path} + ) def prepare_file_path_queue(self): """ @@ -1285,6 +1348,13 @@ class DagFileProcessorManager(LoggingMixin): # Deprecated; may be removed in a future Airflow release. Stats.incr("dag_file_processor_timeouts") processor.kill() + span = Trace.get_current_span() + span.set_attribute("category", "processing") + if span.is_recording(): + span.add_event( + name="dag processing killed processor", + attributes={"file_path": file_path, "action": "timeout"}, + ) # Clean up processor references self.waitables.pop(processor.waitable_handle) @@ -1345,12 +1415,16 @@ class DagFileProcessorManager(LoggingMixin): This is called once every time around the parsing "loop" - i.e. after all files have been parsed. """ - parse_time = time.perf_counter() - self._parsing_start_time - Stats.gauge("dag_processing.total_parse_time", parse_time) - Stats.gauge("dagbag_size", sum(stat.num_dags for stat in self._file_stats.values())) - Stats.gauge( - "dag_processing.import_errors", sum(stat.import_errors for stat in self._file_stats.values()) - ) + with Trace.start_span(span_name="emit_metrics", component="DagFileProcessorManager") as span: + parse_time = time.perf_counter() - self._parsing_start_time + Stats.gauge("dag_processing.total_parse_time", parse_time) + Stats.gauge("dagbag_size", sum(stat.num_dags for stat in self._file_stats.values())) + Stats.gauge( + "dag_processing.import_errors", sum(stat.import_errors for stat in self._file_stats.values()) + ) + span.set_attribute("total_parse_time", parse_time) + span.set_attribute("dag_bag_size", sum(stat.num_dags for stat in self._file_stats.values())) + span.set_attribute("import_errors", sum(stat.import_errors for stat in self._file_stats.values())) @property def file_paths(self): diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 0f94701174..098bb93215 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -32,6 +32,9 @@ from airflow.configuration import conf from airflow.exceptions import RemovedInAirflow3Warning from airflow.models import Log from airflow.stats import Stats +from airflow.traces import NO_TRACE_ID +from airflow.traces.tracer import Trace, gen_context, span +from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import TaskInstanceState @@ -224,6 +227,7 @@ class BaseExecutor(LoggingMixin): Executors should override this to perform gather statuses. """ + @span def heartbeat(self) -> None: """Heartbeat sent to trigger new jobs.""" if not self.parallelism: @@ -241,6 +245,17 @@ class BaseExecutor(LoggingMixin): else: self.log.debug("%s open slots", open_slots) + span = Trace.get_current_span() + if span.is_recording(): + span.add_event( + name="executor", + attributes={ + "executor.open_slots": open_slots, + "executor.queued_tasks": num_queued_tasks, + "executor.running_tasks": num_running_tasks, + }, + ) + Stats.gauge( "executor.open_slots", value=open_slots, tags={"status": "open", "name": self.__class__.__name__} ) @@ -273,12 +288,14 @@ class BaseExecutor(LoggingMixin): reverse=True, ) + @span def trigger_tasks(self, open_slots: int) -> None: """ Initiate async execution of the queued tasks, up to the number of available slots. :param open_slots: Number of open slots """ + span = Trace.get_current_span() sorted_queue = self.order_queued_tasks_by_priority() task_tuples = [] @@ -322,15 +339,40 @@ class BaseExecutor(LoggingMixin): if key in self.attempts: del self.attempts[key] task_tuples.append((key, command, queue, ti.executor_config)) + if span.is_recording(): + span.add_event( + name="task to trigger", + attributes={"command": str(command), "conf": str(ti.executor_config)}, + ) if task_tuples: self._process_tasks(task_tuples) + @span def _process_tasks(self, task_tuples: list[TaskTuple]) -> None: for key, command, queue, executor_config in task_tuples: - del self.queued_tasks[key] - self.execute_async(key=key, command=command, queue=queue, executor_config=executor_config) - self.running.add(key) + task_instance = self.queued_tasks[key][3] # TaskInstance in fourth element + trace_id = int(gen_trace_id(task_instance.dag_run, as_int=True)) + span_id = int(gen_span_id_from_ti_key(key, as_int=True)) + links = [{"trace_id": trace_id, "span_id": span_id}] + + # assuming that the span_id will very likely be unique inside the trace + with Trace.start_span( + span_name=f"{key.dag_id}.{key.task_id}", + component="BaseExecutor", + span_id=span_id, + links=links, + ) as span: + span.set_attribute("dag_id", key.dag_id) + span.set_attribute("run_id", key.run_id) + span.set_attribute("task_id", key.task_id) + span.set_attribute("try_number", key.try_number) + span.set_attribute("command", str(command)) + span.set_attribute("queue", str(queue)) + span.set_attribute("executor_config", str(executor_config)) + del self.queued_tasks[key] + self.execute_async(key=key, command=command, queue=queue, executor_config=executor_config) + self.running.add(key) def change_state( self, key: TaskInstanceKey, state: TaskInstanceState, info=None, remove_running=True @@ -358,6 +400,20 @@ class BaseExecutor(LoggingMixin): :param info: Executor information for the task instance :param key: Unique key for the task instance """ + trace_id = Trace.get_current_span().get_span_context().trace_id + if trace_id != NO_TRACE_ID: + span_id = int(gen_span_id_from_ti_key(key, as_int=True)) + with Trace.start_span( + span_name="fail", + component="BaseExecutor", + parent_sc=gen_context(trace_id=trace_id, span_id=span_id), + ) as span: + span.set_attribute("dag_id", key.dag_id) + span.set_attribute("run_id", key.run_id) + span.set_attribute("task_id", key.task_id) + span.set_attribute("try_number", key.try_number) + span.set_attribute("error", True) + self.change_state(key, TaskInstanceState.FAILED, info) def success(self, key: TaskInstanceKey, info=None) -> None: @@ -367,6 +423,19 @@ class BaseExecutor(LoggingMixin): :param info: Executor information for the task instance :param key: Unique key for the task instance """ + trace_id = Trace.get_current_span().get_span_context().trace_id + if trace_id != NO_TRACE_ID: + span_id = int(gen_span_id_from_ti_key(key, as_int=True)) + with Trace.start_span( + span_name="success", + component="BaseExecutor", + parent_sc=gen_context(trace_id=trace_id, span_id=span_id), + ) as span: + span.set_attribute("dag_id", key.dag_id) + span.set_attribute("run_id", key.run_id) + span.set_attribute("task_id", key.task_id) + span.set_attribute("try_number", key.try_number - 1) + self.change_state(key, TaskInstanceState.SUCCESS, info) def queued(self, key: TaskInstanceKey, info=None) -> None: diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 3b2670e755..90cedf7dbd 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -39,6 +39,7 @@ from setproctitle import getproctitle, setproctitle from airflow import settings from airflow.exceptions import AirflowException from airflow.executors.base_executor import PARALLELISM, BaseExecutor +from airflow.traces.tracer import Trace, span from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import TaskInstanceState @@ -77,6 +78,7 @@ class LocalWorkerBase(Process, LoggingMixin): setproctitle("airflow worker -- LocalExecutor") return super().run() + @span def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None: """ Execute command received and stores result state in queue. @@ -98,6 +100,7 @@ class LocalWorkerBase(Process, LoggingMixin): # Remove the command since the worker is done executing the task setproctitle("airflow worker -- LocalExecutor") + @span def _execute_work_in_subprocess(self, command: CommandType) -> TaskInstanceState: try: subprocess.check_call(command, close_fds=True) @@ -106,6 +109,7 @@ class LocalWorkerBase(Process, LoggingMixin): self.log.error("Failed to execute task %s.", e) return TaskInstanceState.FAILED + @span def _execute_work_in_fork(self, command: CommandType) -> TaskInstanceState: pid = os.fork() if pid: @@ -165,6 +169,7 @@ class LocalWorker(LocalWorkerBase): self.key: TaskInstanceKey = key self.command: CommandType = command + @span def do_work(self) -> None: self.execute_work(key=self.key, command=self.command) @@ -184,6 +189,7 @@ class QueuedLocalWorker(LocalWorkerBase): super().__init__(result_queue=result_queue) self.task_queue = task_queue + @span def do_work(self) -> None: while True: try: @@ -244,6 +250,7 @@ class LocalExecutor(BaseExecutor): self.executor.workers_used = 0 self.executor.workers_active = 0 + @span def execute_async( self, key: TaskInstanceKey, @@ -262,6 +269,14 @@ class LocalExecutor(BaseExecutor): if TYPE_CHECKING: assert self.executor.result_queue + span = Trace.get_current_span() + if span.is_recording(): + span.set_attribute("dag_id", key.dag_id) + span.set_attribute("run_id", key.run_id) + span.set_attribute("task_id", key.task_id) + span.set_attribute("try_number", key.try_number - 1) + span.set_attribute("commands_to_run", str(command)) + local_worker = LocalWorker(self.executor.result_queue, key=key, command=command) self.executor.workers_used += 1 self.executor.workers_active += 1 @@ -311,6 +326,7 @@ class LocalExecutor(BaseExecutor): for worker in self.executor.workers: worker.start() + @span def execute_async( self, key: TaskInstanceKey, @@ -372,6 +388,7 @@ class LocalExecutor(BaseExecutor): self.impl.start() + @span def execute_async( self, key: TaskInstanceKey, diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index 41b8ae9ddc..1b145892eb 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -29,6 +29,7 @@ import subprocess from typing import TYPE_CHECKING, Any from airflow.executors.base_executor import BaseExecutor +from airflow.traces.tracer import Trace, span if TYPE_CHECKING: from airflow.executors.base_executor import CommandType @@ -59,6 +60,7 @@ class SequentialExecutor(BaseExecutor): super().__init__() self.commands_to_run = [] + @span def execute_async( self, key: TaskInstanceKey, @@ -69,6 +71,14 @@ class SequentialExecutor(BaseExecutor): self.validate_airflow_tasks_run_command(command) self.commands_to_run.append((key, command)) + span = Trace.get_current_span() + if span.is_recording(): + span.set_attribute("dag_id", key.dag_id) + span.set_attribute("run_id", key.run_id) + span.set_attribute("task_id", key.task_id) + span.set_attribute("try_number", key.try_number - 1) + span.set_attribute("commands_to_run", str(self.commands_to_run)) + def sync(self) -> None: for key, command in self.commands_to_run: self.log.info("Executing command: %s", command) diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py index 4273f1d334..9384821807 100644 --- a/airflow/jobs/job.py +++ b/airflow/jobs/job.py @@ -34,6 +34,7 @@ from airflow.listeners.listener import get_listener_manager from airflow.models.base import ID_LEN, Base from airflow.serialization.pydantic.job import JobPydantic from airflow.stats import Stats +from airflow.traces.tracer import Trace, span from airflow.utils import timezone from airflow.utils.helpers import convert_camel_to_snake from airflow.utils.log.logging_mixin import LoggingMixin @@ -199,52 +200,62 @@ class Job(Base, LoggingMixin): :param session to use for saving the job """ previous_heartbeat = self.latest_heartbeat - - try: - # This will cause it to load from the db - self._merge_from(Job._fetch_from_db(self, session)) - previous_heartbeat = self.latest_heartbeat - - if self.state == JobState.RESTARTING: - self.kill() - - # Figure out how long to sleep for - sleep_for = 0 - if self.latest_heartbeat: - seconds_remaining = ( - self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds() - ) - sleep_for = max(0, seconds_remaining) - sleep(sleep_for) - - job = Job._update_heartbeat(job=self, session=session) - self._merge_from(job) - time_since_last_heartbeat = (timezone.utcnow() - previous_heartbeat).total_seconds() - health_check_threshold_value = health_check_threshold(self.job_type, self.heartrate) - if time_since_last_heartbeat > health_check_threshold_value: - self.log.info("Heartbeat recovered after %.2f seconds", time_since_last_heartbeat) - # At this point, the DB has updated. - previous_heartbeat = self.latest_heartbeat - - heartbeat_callback(session) - self.log.debug("[heartbeat]") - self.heartbeat_failed = False - except OperationalError: - Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1) - if not self.heartbeat_failed: - self.log.exception("%s heartbeat failed with error", self.__class__.__name__) - self.heartbeat_failed = True - if self.is_alive(): - self.log.error( - "%s heartbeat failed with error. Scheduler may go into unhealthy state", - self.__class__.__name__, - ) - else: - self.log.error( - "%s heartbeat failed with error. Scheduler is in unhealthy state", self.__class__.__name__ - ) - # We didn't manage to heartbeat, so make sure that the timestamp isn't updated - self.latest_heartbeat = previous_heartbeat + with Trace.start_span(span_name="heartbeat", component="Job") as span: + try: + span.set_attribute("heartbeat", str(self.latest_heartbeat)) + # This will cause it to load from the db + self._merge_from(Job._fetch_from_db(self, session)) + previous_heartbeat = self.latest_heartbeat + + if self.state == JobState.RESTARTING: + self.kill() + + # Figure out how long to sleep for + sleep_for = 0 + if self.latest_heartbeat: + seconds_remaining = ( + self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds() + ) + sleep_for = max(0, seconds_remaining) + if span.is_recording(): + span.add_event(name="sleep", attributes={"sleep_for": sleep_for}) + sleep(sleep_for) + + job = Job._update_heartbeat(job=self, session=session) + self._merge_from(job) + time_since_last_heartbeat = (timezone.utcnow() - previous_heartbeat).total_seconds() + health_check_threshold_value = health_check_threshold(self.job_type, self.heartrate) + if time_since_last_heartbeat > health_check_threshold_value: + self.log.info("Heartbeat recovered after %.2f seconds", time_since_last_heartbeat) + # At this point, the DB has updated. + previous_heartbeat = self.latest_heartbeat + + heartbeat_callback(session) + self.log.debug("[heartbeat]") + self.heartbeat_failed = False + except OperationalError: + Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1) + if not self.heartbeat_failed: + self.log.exception("%s heartbeat failed with error", self.__class__.__name__) + self.heartbeat_failed = True + msg = f"{self.__class__.__name__} heartbeat got an exception" + if span.is_recording(): + span.add_event(name="error", attributes={"message": msg}) + if self.is_alive(): + self.log.error( + "%s heartbeat failed with error. Scheduler may go into unhealthy state", + self.__class__.__name__, + ) + msg = f"{self.__class__.__name__} heartbeat failed with error. Scheduler may go into unhealthy state" + if span.is_recording(): + span.add_event(name="error", attributes={"message": msg}) + else: + msg = f"{self.__class__.__name__} heartbeat failed with error. Scheduler is in unhealthy state" + self.log.error(msg) + if span.is_recording(): + span.add_event(name="error", attributes={"message": msg}) + # We didn't manage to heartbeat, so make sure that the timestamp isn't updated + self.latest_heartbeat = previous_heartbeat @provide_session def prepare_for_execution(self, session: Session = NEW_SESSION): @@ -448,6 +459,7 @@ def execute_job(job: Job, execute_callable: Callable[[], int | None]) -> int | N return ret +@span def perform_heartbeat( job: Job, heartbeat_callback: Callable[[Session], None], only_if_necessary: bool ) -> None: diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index 96e36bcfe7..a6a1f0ac8f 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -28,6 +28,7 @@ from airflow.jobs.base_job_runner import BaseJobRunner from airflow.jobs.job import perform_heartbeat from airflow.models.taskinstance import TaskReturnCode from airflow.stats import Stats +from airflow.traces.tracer import Trace from airflow.utils import timezone from airflow.utils.log.file_task_handler import _set_task_deferred_context_var from airflow.utils.log.logging_mixin import LoggingMixin @@ -184,39 +185,55 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): # If LocalTaskJob receives SIGTERM, LocalTaskJob passes SIGTERM to _run_raw_task # If the state of task_instance is changed, LocalTaskJob sends SIGTERM to _run_raw_task while not self.terminating: - # Monitor the task to see if it's done. Wait in a syscall - # (`os.wait`) for as long as possible so we notice the - # subprocess finishing as quick as we can - max_wait_time = max( - 0, # Make sure this value is never negative, - min( - ( - heartbeat_time_limit - - (timezone.utcnow() - self.job.latest_heartbeat).total_seconds() * 0.75 + with Trace.start_span( + span_name="local_task_job_loop", component="LocalTaskJobRunner" + ) as span: + # Monitor the task to see if it's done. Wait in a syscall + # (`os.wait`) for as long as possible so we notice the + # subprocess finishing as quick as we can + max_wait_time = max( + 0, # Make sure this value is never negative, + min( + ( + heartbeat_time_limit + - (timezone.utcnow() - self.job.latest_heartbeat).total_seconds() * 0.75 + ), + self.job.heartrate if self.job.heartrate is not None else heartbeat_time_limit, ), - self.job.heartrate if self.job.heartrate is not None else heartbeat_time_limit, - ), - ) - return_code = self.task_runner.return_code(timeout=max_wait_time) - if return_code is not None: - self.handle_task_exit(return_code) - return return_code - - perform_heartbeat( - job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=False - ) - - # If it's been too long since we've heartbeat, then it's possible that - # the scheduler rescheduled this task, so kill launched processes. - # This can only really happen if the worker can't read the DB for a long time - time_since_last_heartbeat = (timezone.utcnow() - self.job.latest_heartbeat).total_seconds() - if time_since_last_heartbeat > heartbeat_time_limit: - Stats.incr("local_task_job_prolonged_heartbeat_failure", 1, 1) - self.log.error("Heartbeat time limit exceeded!") - raise AirflowException( - f"Time since last heartbeat({time_since_last_heartbeat:.2f}s) exceeded limit " - f"({heartbeat_time_limit}s)." ) + return_code = self.task_runner.return_code(timeout=max_wait_time) + if return_code is not None: + self.handle_task_exit(return_code) + return return_code + + if span.is_recording(): + span.add_event(name="perform_heartbeat") + perform_heartbeat( + job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=False + ) + + # If it's been too long since we've heartbeat, then it's possible that + # the scheduler rescheduled this task, so kill launched processes. + # This can only really happen if the worker can't read the DB for a long time + time_since_last_heartbeat = ( + timezone.utcnow() - self.job.latest_heartbeat + ).total_seconds() + if time_since_last_heartbeat > heartbeat_time_limit: + Stats.incr("local_task_job_prolonged_heartbeat_failure", 1, 1) + self.log.error("Heartbeat time limit exceeded!") + if span.is_recording(): + span.add_event( + name="error", + attributes={ + "message": "Heartbeat time limit exceeded", + "heartbeat_time_limit(s)": heartbeat_time_limit, + "time_since_last_heartbeat(s)": time_since_last_heartbeat, + }, + ) + raise AirflowException( + f"Time since last heartbeat({time_since_last_heartbeat:.2f}s) exceeded limit " + f"({heartbeat_time_limit}s)." + ) return return_code finally: # Print a marker for log grouping of details before task execution diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index f657cbd5f6..883cd05d2b 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -60,7 +60,10 @@ from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance from airflow.stats import Stats from airflow.ti_deps.dependencies_states import EXECUTION_STATES from airflow.timetables.simple import DatasetTriggeredTimetable +from airflow.traces import utils as trace_utils +from airflow.traces.tracer import Trace, span from airflow.utils import timezone +from airflow.utils.dates import datetime_to_nano from airflow.utils.event_scheduler import EventScheduler from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.retries import MAX_DB_RETRIES, retry_db_transaction, run_with_db_retries @@ -815,6 +818,60 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): ti.pid, ) + with Trace.start_span_from_taskinstance(ti=ti) as span: + span.set_attribute("category", "scheduler") + span.set_attribute("task_id", ti.task_id) + span.set_attribute("dag_id", ti.dag_id) + span.set_attribute("state", ti.state) + if ti.state == TaskInstanceState.FAILED: + span.set_attribute("error", True) + span.set_attribute("start_date", str(ti.start_date)) + span.set_attribute("end_date", str(ti.end_date)) + span.set_attribute("duration", ti.duration) + span.set_attribute("executor_config", str(ti.executor_config)) + span.set_attribute("execution_date", str(ti.execution_date)) + span.set_attribute("hostname", ti.hostname) + span.set_attribute("log_url", ti.log_url) + span.set_attribute("operator", str(ti.operator)) + span.set_attribute("try_number", ti.try_number - 1) + span.set_attribute("executor_state", state) + span.set_attribute("job_id", ti.job_id) + span.set_attribute("pool", ti.pool) + span.set_attribute("queue", ti.queue) + span.set_attribute("priority_weight", ti.priority_weight) + span.set_attribute("queued_dttm", str(ti.queued_dttm)) + span.set_attribute("ququed_by_job_id", ti.queued_by_job_id) + span.set_attribute("pid", ti.pid) + if span.is_recording(): + span.add_event(name="queued", timestamp=datetime_to_nano(ti.queued_dttm)) + span.add_event(name="started", timestamp=datetime_to_nano(ti.start_date)) + span.add_event(name="ended", timestamp=datetime_to_nano(ti.end_date)) + if conf.has_option("traces", "otel_task_log_event") and conf.getboolean( + "traces", "otel_task_log_event" + ): + from airflow.utils.log.log_reader import TaskLogReader + + task_log_reader = TaskLogReader() + if task_log_reader.supports_read: + metadata: dict[str, Any] = {} + logs, metadata = task_log_reader.read_log_chunks(ti, ti.try_number, metadata) + if ti.hostname in dict(logs[0]): + message = str(dict(logs[0])[ti.hostname]).replace("\\n", "\n") + while metadata["end_of_log"] is False: + logs, metadata = task_log_reader.read_log_chunks( + ti, ti.try_number - 1, metadata + ) + if ti.hostname in dict(logs[0]): + message = message + str(dict(logs[0])[ti.hostname]).replace("\\n", "\n") + if span.is_recording(): + span.add_event( + name="task_log", + attributes={ + "message": message, + "metadata": str(metadata), + }, + ) + # There are two scenarios why the same TI with the same try_number is queued # after executor is finished with it: # 1) the TI was killed externally and it had no time to mark itself failed @@ -1044,13 +1101,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): ) for loop_count in itertools.count(start=1): - with Stats.timer("scheduler.scheduler_loop_duration") as timer: - if self.using_sqlite and self.processor_agent: - self.processor_agent.run_single_parsing_loop() - # For the sqlite case w/ 1 thread, wait until the processor - # is finished to avoid concurrent access to the DB. - self.log.debug("Waiting for processors to finish since we're using sqlite") - self.processor_agent.wait_until_finished() + with Trace.start_span(span_name="scheduler_job_loop", component="SchedulerJobRunner") as span: + span.set_attribute("category", "scheduler") + span.set_attribute("loop_count", loop_count) + with Stats.timer("scheduler.scheduler_loop_duration") as timer: + if self.using_sqlite and self.processor_agent: + self.processor_agent.run_single_parsing_loop() + # For the sqlite case w/ 1 thread, wait until the processor + # is finished to avoid concurrent access to the DB. + self.log.debug("Waiting for processors to finish since we're using sqlite") + self.processor_agent.wait_until_finished() with create_session() as session: # This will schedule for as many executors as possible. @@ -1079,16 +1139,23 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): if self.processor_agent: self.processor_agent.heartbeat() - # Heartbeat the scheduler periodically - perform_heartbeat( - job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True - ) + # Heartbeat the scheduler periodically + perform_heartbeat( + job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True + ) - # Run any pending timed events - next_event = timers.run(blocking=False) - self.log.debug("Next timed event is in %f", next_event) + # Run any pending timed events + next_event = timers.run(blocking=False) + self.log.debug("Next timed event is in %f", next_event) self.log.debug("Ran scheduling loop in %.2f seconds", timer.duration) + if span.is_recording(): + span.add_event( + name="Ran scheduling loop", + attributes={ + "duration in seconds": timer.duration, + }, + ) if not is_unit_test and not num_queued_tis and not num_finished_events: # If the scheduler is doing things, don't sleep. This means when there is work to do, the @@ -1102,6 +1169,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): self.num_runs, loop_count, ) + if span.is_recording(): + span.add_event("Exiting scheduler loop as requested number of runs has been reached") break if self.processor_agent and self.processor_agent.done: self.log.info( @@ -1110,6 +1179,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): self.num_times_parse_dags, loop_count, ) + if span.is_recording(): + span.add_event("Exiting scheduler loop as requested DAG parse count has been reached") break def _do_scheduling(self, session: Session) -> int: @@ -1229,6 +1300,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): guard.commit() # END: create dagruns + @span def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -> None: """Create a DAG run and update the dag_model to control if/when the next DAGRun should be created.""" # Bulk Fetch DagRuns with dag_id and execution_date same @@ -1436,6 +1508,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): return False return True + @span def _start_queued_dagruns(self, session: Session) -> None: """Find DagRuns in queued state and decide moving them to running state.""" # added all() to save runtime, otherwise query is executed more than once @@ -1445,7 +1518,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs), only_running=True, session=session), ) + @span def _update_state(dag: DAG, dag_run: DagRun): + __span = Trace.get_current_span() + __span.set_attribute("state", str(DagRunState.RUNNING)) + __span.set_attribute("run_id", dag_run.run_id) + __span.set_attribute("type", dag_run.run_type) + __span.set_attribute("dag_id", dag_run.dag_id) + dag_run.state = DagRunState.RUNNING dag_run.start_date = timezone.utcnow() if dag.timetable.periodic and not dag_run.external_trigger and dag_run.clear_number < 1: @@ -1465,12 +1545,18 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): schedule_delay, tags={"dag_id": dag.dag_id}, ) + if __span.is_recording(): + __span.add_event( + name="schedule_delay", + attributes={"dag_id": dag.dag_id, "schedule_delay": str(schedule_delay)}, + ) # cache saves time during scheduling of many dag_runs for same dag cached_get_dag: Callable[[str], DAG | None] = lru_cache()( partial(self.dagbag.get_dag, session=session) ) + _span = Trace.get_current_span() for dag_run in dag_runs: dag = dag_run.dag = cached_get_dag(dag_run.dag_id) @@ -1487,6 +1573,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): dag_run.execution_date, ) else: + if _span.is_recording(): + _span.add_event( + name="dag_run", + attributes={ + "run_id": dag_run.run_id, + "dag_id": dag_run.dag_id, + "conf": str(dag_run.conf), + }, + ) active_runs_of_dags[dag_run.dag_id] += 1 _update_state(dag, dag_run) dag_run.notify_dagrun_state_changed() @@ -1514,70 +1609,101 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): :param dag_run: The DagRun to schedule :return: Callback that needs to be executed """ - callback: DagCallbackRequest | None = None + trace_id = int(trace_utils.gen_trace_id(dag_run=dag_run, as_int=True)) + span_id = int(trace_utils.gen_dag_span_id(dag_run=dag_run, as_int=True)) + links = [{"trace_id": trace_id, "span_id": span_id}] + + with Trace.start_span( + span_name="_schedule_dag_run", component="SchedulerJobRunner", links=links + ) as span: + span.set_attribute("dag_id", dag_run.dag_id) + span.set_attribute("run_id", dag_run.run_id) + span.set_attribute("run_type", dag_run.run_type) + callback: DagCallbackRequest | None = None + + dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session) + dag_model = DM.get_dagmodel(dag_run.dag_id, session) + + if not dag or not dag_model: + self.log.error("Couldn't find DAG %s in DAG bag or database!", dag_run.dag_id) + return callback + + if ( + dag_run.start_date + and dag.dagrun_timeout + and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout + ): + dag_run.set_state(DagRunState.FAILED) + unfinished_task_instances = session.scalars( + select(TI) + .where(TI.dag_id == dag_run.dag_id) + .where(TI.run_id == dag_run.run_id) + .where(TI.state.in_(State.unfinished)) + ) + for task_instance in unfinished_task_instances: + task_instance.state = TaskInstanceState.SKIPPED + session.merge(task_instance) + session.flush() + self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id) + + if self._should_update_dag_next_dagruns( + dag, dag_model, last_dag_run=dag_run, session=session + ): + dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) + + callback_to_execute = DagCallbackRequest( + full_filepath=dag.fileloc, + dag_id=dag.dag_id, + run_id=dag_run.run_id, + is_failure_callback=True, + processor_subdir=dag_model.processor_subdir, + msg="timed_out", + ) - dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session) - dag_model = DM.get_dagmodel(dag_run.dag_id, session) + dag_run.notify_dagrun_state_changed() + duration = dag_run.end_date - dag_run.start_date + Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}", duration) + Stats.timing("dagrun.duration.failed", duration, tags={"dag_id": dag_run.dag_id}) + span.set_attribute("error", True) + if span.is_recording(): + span.add_event( + name="error", + attributes={ + "message": f"Run {dag_run.run_id} of {dag_run.dag_id} has timed-out", + "duration": str(duration), + }, + ) + return callback_to_execute - if not dag or not dag_model: - self.log.error("Couldn't find DAG %s in DAG bag or database!", dag_run.dag_id) - return callback + if dag_run.execution_date > timezone.utcnow() and not dag.allow_future_exec_dates: + self.log.error("Execution date is in future: %s", dag_run.execution_date) + return callback - if ( - dag_run.start_date - and dag.dagrun_timeout - and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout - ): - dag_run.set_state(DagRunState.FAILED) - unfinished_task_instances = session.scalars( - select(TI) - .where(TI.dag_id == dag_run.dag_id) - .where(TI.run_id == dag_run.run_id) - .where(TI.state.in_(State.unfinished)) - ) - for task_instance in unfinished_task_instances: - task_instance.state = TaskInstanceState.SKIPPED - session.merge(task_instance) - session.flush() - self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id) + if not self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session): + self.log.warning( + "The DAG disappeared before verifying integrity: %s. Skipping.", dag_run.dag_id + ) + return callback + # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else? + schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False) if self._should_update_dag_next_dagruns(dag, dag_model, last_dag_run=dag_run, session=session): dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) + # This will do one query per dag run. We "could" build up a complex + # query to update all the TIs across all the execution dates and dag + # IDs in a single query, but it turns out that can be _very very slow_ + # see #11147/commit ee90807ac for more details + if span.is_recording(): + span.add_event( + name="schedule_tis", + attributes={ + "message": "dag_run scheduling its tis", + "schedulable_tis": [_ti.task_id for _ti in schedulable_tis], + }, + ) + dag_run.schedule_tis(schedulable_tis, session, max_tis_per_query=self.job.max_tis_per_query) - callback_to_execute = DagCallbackRequest( - full_filepath=dag.fileloc, - dag_id=dag.dag_id, - run_id=dag_run.run_id, - is_failure_callback=True, - processor_subdir=dag_model.processor_subdir, - msg="timed_out", - ) - - dag_run.notify_dagrun_state_changed() - duration = dag_run.end_date - dag_run.start_date - Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}", duration) - Stats.timing("dagrun.duration.failed", duration, tags={"dag_id": dag_run.dag_id}) - return callback_to_execute - - if dag_run.execution_date > timezone.utcnow() and not dag.allow_future_exec_dates: - self.log.error("Execution date is in future: %s", dag_run.execution_date) - return callback - - if not self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session): - self.log.warning("The DAG disappeared before verifying integrity: %s. Skipping.", dag_run.dag_id) - return callback - # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else? - schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False) - - if self._should_update_dag_next_dagruns(dag, dag_model, last_dag_run=dag_run, session=session): - dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) - # This will do one query per dag run. We "could" build up a complex - # query to update all the TIs across all the execution dates and dag - # IDs in a single query, but it turns out that can be _very very slow_ - # see #11147/commit ee90807ac for more details - dag_run.schedule_tis(schedulable_tis, session, max_tis_per_query=self.job.max_tis_per_query) - - return callback_to_run + return callback_to_run def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) -> bool: """ @@ -1681,20 +1807,27 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None: from airflow.models.pool import Pool - pools = Pool.slots_stats(session=session) - for pool_name, slot_stats in pools.items(): - Stats.gauge(f"pool.open_slots.{pool_name}", slot_stats["open"]) - Stats.gauge(f"pool.queued_slots.{pool_name}", slot_stats["queued"]) - Stats.gauge(f"pool.running_slots.{pool_name}", slot_stats["running"]) - Stats.gauge(f"pool.deferred_slots.{pool_name}", slot_stats["deferred"]) - Stats.gauge(f"pool.scheduled_slots.{pool_name}", slot_stats["scheduled"]) - - # Same metrics with tagging - Stats.gauge("pool.open_slots", slot_stats["open"], tags={"pool_name": pool_name}) - Stats.gauge("pool.queued_slots", slot_stats["queued"], tags={"pool_name": pool_name}) - Stats.gauge("pool.running_slots", slot_stats["running"], tags={"pool_name": pool_name}) - Stats.gauge("pool.deferred_slots", slot_stats["deferred"], tags={"pool_name": pool_name}) - Stats.gauge("pool.scheduled_slots", slot_stats["scheduled"], tags={"pool_name": pool_name}) + with Trace.start_span(span_name="emit_pool_metrics", component="SchedulerJobRunner") as span: + pools = Pool.slots_stats(session=session) + for pool_name, slot_stats in pools.items(): + Stats.gauge(f"pool.open_slots.{pool_name}", slot_stats["open"]) + Stats.gauge(f"pool.queued_slots.{pool_name}", slot_stats["queued"]) + Stats.gauge(f"pool.running_slots.{pool_name}", slot_stats["running"]) + Stats.gauge(f"pool.deferred_slots.{pool_name}", slot_stats["deferred"]) + Stats.gauge(f"pool.scheduled_slots.{pool_name}", slot_stats["scheduled"]) + + # Same metrics with tagging + Stats.gauge("pool.open_slots", slot_stats["open"], tags={"pool_name": pool_name}) + Stats.gauge("pool.queued_slots", slot_stats["queued"], tags={"pool_name": pool_name}) + Stats.gauge("pool.running_slots", slot_stats["running"], tags={"pool_name": pool_name}) + Stats.gauge("pool.deferred_slots", slot_stats["deferred"], tags={"pool_name": pool_name}) + Stats.gauge("pool.scheduled_slots", slot_stats["scheduled"], tags={"pool_name": pool_name}) + + span.set_attribute("category", "scheduler") + span.set_attribute(f"pool.open_slots.{pool_name}", slot_stats["open"]) + span.set_attribute(f"pool.queued_slots.{pool_name}", slot_stats["queued"]) + span.set_attribute(f"pool.running_slots.{pool_name}", slot_stats["running"]) + span.set_attribute(f"pool.deferred_slots.{pool_name}", slot_stats["deferred"]) @provide_session def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) -> int: diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index e1736ae8e5..080323a1d1 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -37,6 +37,7 @@ from airflow.jobs.base_job_runner import BaseJobRunner from airflow.jobs.job import perform_heartbeat from airflow.models.trigger import Trigger from airflow.stats import Stats +from airflow.traces.tracer import Trace, span from airflow.triggers.base import TriggerEvent from airflow.typing_compat import TypedDict from airflow.utils import timezone @@ -362,26 +363,43 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin): if not self.trigger_runner.is_alive(): self.log.error("Trigger runner thread has died! Exiting.") break - # Clean out unused triggers - Trigger.clean_unused() - # Load/delete triggers - self.load_triggers() - # Handle events - self.handle_events() - # Handle failed triggers - self.handle_failed_triggers() - perform_heartbeat(self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True) - # Collect stats - self.emit_metrics() + with Trace.start_span(span_name="triggerer_job_loop", component="TriggererJobRunner") as span: + # Clean out unused triggers + if span.is_recording(): + span.add_event(name="Trigger.clean_unused") + Trigger.clean_unused() + # Load/delete triggers + if span.is_recording(): + span.add_event(name="load_triggers") + self.load_triggers() + # Handle events + if span.is_recording(): + span.add_event(name="handle_events") + self.handle_events() + # Handle failed triggers + if span.is_recording(): + span.add_event(name="handle_failed_triggers") + self.handle_failed_triggers() + if span.is_recording(): + span.add_event(name="perform_heartbeat") + perform_heartbeat( + self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True + ) + # Collect stats + if span.is_recording(): + span.add_event(name="emit_metrics") + self.emit_metrics() # Idle sleep time.sleep(1) + @span def load_triggers(self): """Query the database for the triggers we're supposed to be running and update the runner.""" Trigger.assign_unassigned(self.job.id, self.capacity, self.health_check_threshold) ids = Trigger.ids_for_triggerer(self.job.id) self.trigger_runner.update_triggers(set(ids)) + @span def handle_events(self): """Dispatch outbound events to the Trigger model which pushes them to the relevant task instances.""" while self.trigger_runner.events: @@ -392,6 +410,7 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin): # Emit stat event Stats.incr("triggers.succeeded") + @span def handle_failed_triggers(self): """ Handle "failed" triggers. - ones that errored or exited before they sent an event. @@ -405,11 +424,15 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin): # Emit stat event Stats.incr("triggers.failed") + @span def emit_metrics(self): Stats.gauge(f"triggers.running.{self.job.hostname}", len(self.trigger_runner.triggers)) Stats.gauge( "triggers.running", len(self.trigger_runner.triggers), tags={"hostname": self.job.hostname} ) + span = Trace.get_current_span() + span.set_attribute("trigger host", self.job.hostname) + span.set_attribute("triggers running", len(self.trigger_runner.triggers)) class TriggerDetails(TypedDict): diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 6c3d0715b9..d4ef937e9d 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -62,7 +62,9 @@ from airflow.models.tasklog import LogTemplate from airflow.stats import Stats from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES +from airflow.traces.tracer import Trace from airflow.utils import timezone +from airflow.utils.dates import datetime_to_nano from airflow.utils.helpers import chunks, is_container, prune_dict from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import NEW_SESSION, provide_session @@ -919,6 +921,37 @@ class DagRun(Base, LoggingMixin): self.data_interval_end, self.dag_hash, ) + + with Trace.start_span_from_dagrun(dagrun=self) as span: + if self._state is DagRunState.FAILED: + span.set_attribute("error", True) + attributes = { + "category": "DAG runs", + "dag_id": str(self.dag_id), + "execution_date": str(self.execution_date), + "run_id": str(self.run_id), + "queued_at": str(self.queued_at), + "run_start_date": str(self.start_date), + "run_end_date": str(self.end_date), + "run_duration": str( + (self.end_date - self.start_date).total_seconds() + if self.start_date and self.end_date + else 0 + ), + "state": str(self._state), + "external_trigger": str(self.external_trigger), + "run_type": str(self.run_type), + "data_interval_start": str(self.data_interval_start), + "data_interval_end": str(self.data_interval_end), + "dag_hash": str(self.dag_hash), + "conf": str(self.conf), + } + if span.is_recording(): + span.add_event(name="queued", timestamp=datetime_to_nano(self.queued_at)) + span.add_event(name="started", timestamp=datetime_to_nano(self.start_date)) + span.add_event(name="ended", timestamp=datetime_to_nano(self.end_date)) + span.set_attributes(attributes) + session.flush() self._emit_true_scheduling_delay_stats_for_finished_state(finished_tis) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 1dacbe7525..27eb5c26c2 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -108,6 +108,7 @@ from airflow.stats import Stats from airflow.templates import SandboxedEnvironment from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS +from airflow.traces.tracer import Trace from airflow.utils import timezone from airflow.utils.context import ( ConnectionAccessor, @@ -1211,6 +1212,27 @@ def _handle_failure( if not test_mode: TaskInstance.save_to_db(failure_context["ti"], session) + with Trace.start_span_from_taskinstance(ti=task_instance) as span: + # ---- error info ---- + span.set_attribute("error", "true") + span.set_attribute("error_msg", str(error)) + span.set_attribute("context", context) + span.set_attribute("force_fail", force_fail) + # ---- common info ---- + span.set_attribute("category", "DAG runs") + span.set_attribute("task_id", task_instance.task_id) + span.set_attribute("dag_id", task_instance.dag_id) + span.set_attribute("state", task_instance.state) + span.set_attribute("start_date", str(task_instance.start_date)) + span.set_attribute("end_date", str(task_instance.end_date)) + span.set_attribute("duration", task_instance.duration) + span.set_attribute("executor_config", str(task_instance.executor_config)) + span.set_attribute("execution_date", str(task_instance.execution_date)) + span.set_attribute("hostname", task_instance.hostname) + if isinstance(task_instance, TaskInstance): + span.set_attribute("log_url", task_instance.log_url) + span.set_attribute("operator", str(task_instance.operator)) + def _refresh_from_task( *, task_instance: TaskInstance | TaskInstancePydantic, task: Operator, pool_override: str | None = None diff --git a/airflow/traces/__init__.py b/airflow/traces/__init__.py index abe55b5103..7b2f416872 100644 --- a/airflow/traces/__init__.py +++ b/airflow/traces/__init__.py @@ -18,3 +18,4 @@ from __future__ import annotations TRACEPARENT = "traceparent" TRACESTATE = "tracestate" +NO_TRACE_ID = 1 diff --git a/airflow/traces/tracer.py b/airflow/traces/tracer.py index 88999abe2b..1d58717287 100644 --- a/airflow/traces/tracer.py +++ b/airflow/traces/tracer.py @@ -96,6 +96,9 @@ class EmptySpan: """Set multiple attributes at once.""" pass + def is_recording(self): + return False + def add_event( self, name: str, diff --git a/airflow/traces/utils.py b/airflow/traces/utils.py index eaf3c1c065..afab2591d5 100644 --- a/airflow/traces/utils.py +++ b/airflow/traces/utils.py @@ -20,6 +20,7 @@ from __future__ import annotations import logging from typing import TYPE_CHECKING +from airflow.traces import NO_TRACE_ID from airflow.utils.hashlib_wrapper import md5 from airflow.utils.state import TaskInstanceState @@ -40,9 +41,12 @@ def _gen_id(seeds: list[str], as_int: bool = False, type: int = TRACE_ID) -> str def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int: + if dag_run.start_date is None: + return NO_TRACE_ID + """Generate trace id from DagRun.""" return _gen_id( - [dag_run.dag_id, dag_run.run_id, str(dag_run.start_date.timestamp())], + [dag_run.dag_id, str(dag_run.run_id), str(dag_run.start_date.timestamp())], as_int, ) @@ -50,7 +54,7 @@ def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int: def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) -> str | int: """Generate span id from TI key.""" return _gen_id( - [ti_key.dag_id, ti_key.run_id, ti_key.task_id, str(ti_key.try_number)], + [ti_key.dag_id, str(ti_key.run_id), ti_key.task_id, str(ti_key.try_number)], as_int, SPAN_ID, ) @@ -58,8 +62,11 @@ def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) -> st def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int: """Generate dag's root span id using dag_run.""" + if dag_run.start_date is None: + return NO_TRACE_ID + return _gen_id( - [dag_run.dag_id, dag_run.run_id, str(dag_run.start_date.timestamp())], + [dag_run.dag_id, str(dag_run.run_id), str(dag_run.start_date.timestamp())], as_int, SPAN_ID, ) diff --git a/scripts/ci/docker-compose/integration-otel.yml b/scripts/ci/docker-compose/integration-otel.yml index 6573709bc3..7a635c17c7 100644 --- a/scripts/ci/docker-compose/integration-otel.yml +++ b/scripts/ci/docker-compose/integration-otel.yml @@ -54,7 +54,7 @@ services: - ./grafana/volume/provisioning:/grafana/provisioning jaeger: - image: jaegertracing/all-in-one + image: jaegertracing/all-in-one:1.57 container_name: "breeze-jaeger" environment: COLLECTOR_OTLP_ENABLED: true