This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f4884c3ca [AIP-49] OpenTelemetry Traces for Apache Airflow Part 2 
(#40802)
0f4884c3ca is described below

commit 0f4884c3ca26c39bc6bd21967c950e718589fcd6
Author: Howard Yoo <32691630+howard...@users.noreply.github.com>
AuthorDate: Sat Jul 20 02:11:40 2024 -0500

    [AIP-49] OpenTelemetry Traces for Apache Airflow Part 2 (#40802)
    
    
    
    ---------
    
    Co-authored-by: D. Ferruzzi <ferru...@amazon.com>
---
 airflow/dag_processing/manager.py              | 302 ++++++++++++++----------
 airflow/executors/base_executor.py             |  75 +++++-
 airflow/executors/local_executor.py            |  17 ++
 airflow/executors/sequential_executor.py       |  10 +
 airflow/jobs/job.py                            | 104 +++++----
 airflow/jobs/local_task_job_runner.py          |  79 ++++---
 airflow/jobs/scheduler_job_runner.py           | 303 ++++++++++++++++++-------
 airflow/jobs/triggerer_job_runner.py           |  45 +++-
 airflow/models/dagrun.py                       |  33 +++
 airflow/models/taskinstance.py                 |  22 ++
 airflow/traces/__init__.py                     |   1 +
 airflow/traces/tracer.py                       |   3 +
 airflow/traces/utils.py                        |  13 +-
 scripts/ci/docker-compose/integration-otel.yml |   2 +-
 14 files changed, 715 insertions(+), 294 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index f358dd9ee1..57858fc427 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -53,7 +53,9 @@ from airflow.models.errors import ParseImportError
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.secrets.cache import SecretCache
 from airflow.stats import Stats
+from airflow.traces.tracer import Trace, span
 from airflow.utils import timezone
+from airflow.utils.dates import datetime_to_nano
 from airflow.utils.file import list_py_file_paths, might_contain_dag
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.mixins import MultiprocessingStartMethodMixin
@@ -228,7 +230,9 @@ class DagFileProcessorAgent(LoggingMixin, 
MultiprocessingStartMethodMixin):
         # to kill all sub-process of this at the OS-level, rather than having
         # to iterate the child processes
         set_new_process_group()
-
+        span = Trace.get_current_span()
+        span.set_attribute("dag_directory", str(dag_directory))
+        span.set_attribute("dag_ids", str(dag_ids))
         setproctitle("airflow scheduler -- DagFileProcessorManager")
         reload_configuration_for_dag_processing()
         processor_manager = DagFileProcessorManager(
@@ -258,8 +262,10 @@ class DagFileProcessorAgent(LoggingMixin, 
MultiprocessingStartMethodMixin):
         self._heartbeat_manager()
 
     def _process_message(self, message):
+        span = Trace.get_current_span()
         self.log.debug("Received message of type %s", type(message).__name__)
         if isinstance(message, DagParsingStat):
+            span.set_attribute("all_files_processed", 
str(message.all_files_processed))
             self._sync_metadata(message)
         else:
             raise RuntimeError(f"Unexpected message received of type 
{type(message).__name__}")
@@ -562,118 +568,144 @@ class DagFileProcessorManager(LoggingMixin):
             # in sync mode we need to be told to start a "loop"
             self.start_new_processes()
         while True:
-            loop_start_time = time.monotonic()
-            ready = multiprocessing.connection.wait(self.waitables.keys(), 
timeout=poll_time)
-            self.heartbeat()
-            if self._direct_scheduler_conn is not None and 
self._direct_scheduler_conn in ready:
-                agent_signal = self._direct_scheduler_conn.recv()
-
-                self.log.debug("Received %s signal from 
DagFileProcessorAgent", agent_signal)
-                if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
-                    self.terminate()
-                    break
-                elif agent_signal == DagParsingSignal.END_MANAGER:
-                    self.end()
-                    sys.exit(os.EX_OK)
-                elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE:
-                    # continue the loop to parse dags
-                    pass
-                elif isinstance(agent_signal, CallbackRequest):
-                    self._add_callback_to_queue(agent_signal)
-                else:
-                    raise ValueError(f"Invalid message {type(agent_signal)}")
-
-            if not ready and not self._async_mode:
-                # In "sync" mode we don't want to parse the DAGs until we
-                # are told to (as that would open another connection to the
-                # SQLite DB which isn't a good practice
-
-                # This shouldn't happen, as in sync mode poll should block for
-                # ever. Lets be defensive about that.
-                self.log.warning(
-                    "wait() unexpectedly returned nothing ready after infinite 
timeout (%r)!", poll_time
-                )
-
-                continue
-
-            for sentinel in ready:
-                if sentinel is not self._direct_scheduler_conn:
-                    processor = self.waitables.get(sentinel)
-                    if processor:
-                        self._collect_results_from_processor(processor)
-                        self.waitables.pop(sentinel)
-                        self._processors.pop(processor.file_path)
-
-            if self.standalone_dag_processor:
-                self._fetch_callbacks(max_callbacks_per_loop)
-            self._scan_stale_dags()
-            DagWarning.purge_inactive_dag_warnings()
-            refreshed_dag_dir = self._refresh_dag_dir()
-
-            self._kill_timed_out_processors()
-
-            # Generate more file paths to process if we processed all the 
files already. Note for this
-            # to clear down, we must have cleared all files found from 
scanning the dags dir _and_ have
-            # cleared all files added as a result of callbacks
-            if not self._file_path_queue:
-                self.emit_metrics()
-                self.prepare_file_path_queue()
-
-            # if new files found in dag dir, add them
-            elif refreshed_dag_dir:
-                self.add_new_file_path_to_queue()
-
-            self._refresh_requested_filelocs()
-            self.start_new_processes()
-
-            # Update number of loop iteration.
-            self._num_run += 1
-
-            if not self._async_mode:
-                self.log.debug("Waiting for processors to finish since we're 
using sqlite")
-                # Wait until the running DAG processors are finished before
-                # sending a DagParsingStat message back. This means the Agent
-                # can tell we've got to the end of this iteration when it sees
-                # this type of message
-                self.wait_until_finished()
-
-            # Collect anything else that has finished, but don't kick off any 
more processors
-            self.collect_results()
+            with Trace.start_span(span_name="dag_parsing_loop", 
component="DagFileProcessorManager") as span:
+                loop_start_time = time.monotonic()
+                ready = multiprocessing.connection.wait(self.waitables.keys(), 
timeout=poll_time)
+                if span.is_recording():
+                    span.add_event(name="heartbeat")
+                self.heartbeat()
+                if self._direct_scheduler_conn is not None and 
self._direct_scheduler_conn in ready:
+                    agent_signal = self._direct_scheduler_conn.recv()
+
+                    self.log.debug("Received %s signal from 
DagFileProcessorAgent", agent_signal)
+                    if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
+                        if span.is_recording():
+                            span.add_event(name="terminate")
+                        self.terminate()
+                        break
+                    elif agent_signal == DagParsingSignal.END_MANAGER:
+                        if span.is_recording():
+                            span.add_event(name="end")
+                        self.end()
+                        sys.exit(os.EX_OK)
+                    elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE:
+                        # continue the loop to parse dags
+                        pass
+                    elif isinstance(agent_signal, CallbackRequest):
+                        self._add_callback_to_queue(agent_signal)
+                    else:
+                        raise ValueError(f"Invalid message 
{type(agent_signal)}")
+
+                if not ready and not self._async_mode:
+                    # In "sync" mode we don't want to parse the DAGs until we
+                    # are told to (as that would open another connection to the
+                    # SQLite DB which isn't a good practice
+
+                    # This shouldn't happen, as in sync mode poll should block 
for
+                    # ever. Lets be defensive about that.
+                    self.log.warning(
+                        "wait() unexpectedly returned nothing ready after 
infinite timeout (%r)!", poll_time
+                    )
 
-            self._print_stat()
+                    continue
 
-            all_files_processed = all(self.get_last_finish_time(x) is not None 
for x in self.file_paths)
-            max_runs_reached = self.max_runs_reached()
+                for sentinel in ready:
+                    if sentinel is not self._direct_scheduler_conn:
+                        processor = self.waitables.get(sentinel)
+                        if processor:
+                            self._collect_results_from_processor(processor)
+                            self.waitables.pop(sentinel)
+                            self._processors.pop(processor.file_path)
+
+                if self.standalone_dag_processor:
+                    self._fetch_callbacks(max_callbacks_per_loop)
+                self._scan_stale_dags()
+                DagWarning.purge_inactive_dag_warnings()
+                refreshed_dag_dir = self._refresh_dag_dir()
+
+                if span.is_recording():
+                    span.add_event(name="_kill_timed_out_processors")
+                self._kill_timed_out_processors()
+
+                # Generate more file paths to process if we processed all the 
files already. Note for this
+                # to clear down, we must have cleared all files found from 
scanning the dags dir _and_ have
+                # cleared all files added as a result of callbacks
+                if not self._file_path_queue:
+                    self.emit_metrics()
+                    if span.is_recording():
+                        span.add_event(name="prepare_file_path_queue")
+                    self.prepare_file_path_queue()
+
+                # if new files found in dag dir, add them
+                elif refreshed_dag_dir:
+                    if span.is_recording():
+                        span.add_event(name="add_new_file_path_to_queue")
+                    self.add_new_file_path_to_queue()
+
+                self._refresh_requested_filelocs()
+                if span.is_recording():
+                    span.add_event(name="start_new_processes")
+                self.start_new_processes()
+
+                # Update number of loop iteration.
+                self._num_run += 1
+
+                if not self._async_mode:
+                    self.log.debug("Waiting for processors to finish since 
we're using sqlite")
+                    # Wait until the running DAG processors are finished before
+                    # sending a DagParsingStat message back. This means the 
Agent
+                    # can tell we've got to the end of this iteration when it 
sees
+                    # this type of message
+                    self.wait_until_finished()
+
+                # Collect anything else that has finished, but don't kick off 
any more processors
+                if span.is_recording():
+                    span.add_event(name="collect_results")
+                self.collect_results()
+
+                if span.is_recording():
+                    span.add_event(name="print_stat")
+                self._print_stat()
+
+                all_files_processed = all(self.get_last_finish_time(x) is not 
None for x in self.file_paths)
+                max_runs_reached = self.max_runs_reached()
 
-            try:
-                if self._direct_scheduler_conn:
-                    self._direct_scheduler_conn.send(
-                        DagParsingStat(
-                            max_runs_reached,
-                            all_files_processed,
+                try:
+                    if self._direct_scheduler_conn:
+                        self._direct_scheduler_conn.send(
+                            DagParsingStat(
+                                max_runs_reached,
+                                all_files_processed,
+                            )
                         )
+                except BlockingIOError:
+                    # Try again next time around the loop!
+
+                    # It is better to fail, than it is deadlock. This should
+                    # "almost never happen" since the DagParsingStat object is
+                    # small, and in async mode this stat is not actually 
_required_
+                    # for normal operation (It only drives "max runs")
+                    self.log.debug("BlockingIOError received trying to send 
DagParsingStat, ignoring")
+
+                if max_runs_reached:
+                    self.log.info(
+                        "Exiting dag parsing loop as all files have been 
processed %s times", self._max_runs
                     )
-            except BlockingIOError:
-                # Try again next time around the loop!
-
-                # It is better to fail, than it is deadlock. This should
-                # "almost never happen" since the DagParsingStat object is
-                # small, and in async mode this stat is not actually _required_
-                # for normal operation (It only drives "max runs")
-                self.log.debug("BlockingIOError received trying to send 
DagParsingStat, ignoring")
-
-            if max_runs_reached:
-                self.log.info(
-                    "Exiting dag parsing loop as all files have been processed 
%s times", self._max_runs
-                )
-                break
+                    if span.is_recording():
+                        span.add_event(
+                            name="info",
+                            attributes={
+                                "message": "Exiting dag parsing loop as all 
files have been processed {self._max_runs} times"
+                            },
+                        )
+                    break
 
-            if self._async_mode:
-                loop_duration = time.monotonic() - loop_start_time
-                if loop_duration < 1:
-                    poll_time = 1 - loop_duration
-                else:
-                    poll_time = 0.0
+                if self._async_mode:
+                    loop_duration = time.monotonic() - loop_start_time
+                    if loop_duration < 1:
+                        poll_time = 1 - loop_duration
+                    else:
+                        poll_time = 0.0
 
     @provide_session
     def _fetch_callbacks(self, max_callbacks: int, session: Session = 
NEW_SESSION):
@@ -1103,6 +1135,24 @@ class DagFileProcessorManager(LoggingMixin):
         )
         self._file_stats[processor.file_path] = stat
         file_name = Path(processor.file_path).stem
+        """crude exposure of instrumentation code which may need to be 
furnished"""
+        span = Trace.get_tracer("DagFileProcessorManager").start_span(
+            "dag_processing", start_time=datetime_to_nano(processor.start_time)
+        )
+        span.set_attribute("file_path", processor.file_path)
+        span.set_attribute("run_count", 
self.get_run_count(processor.file_path) + 1)
+
+        if processor.result is None:
+            span.set_attribute("error", True)
+            span.set_attribute("processor.exit_code", processor.exit_code)
+        else:
+            span.set_attribute("num_dags", num_dags)
+            span.set_attribute("import_errors", count_import_errors)
+            if count_import_errors > 0:
+                span.set_attribute("error", True)
+
+        span.end(end_time=datetime_to_nano(last_finish_time))
+
         Stats.timing(f"dag_processing.last_duration.{file_name}", 
last_duration)
         Stats.timing("dag_processing.last_duration", last_duration, 
tags={"file_name": file_name})
 
@@ -1134,6 +1184,7 @@ class DagFileProcessorManager(LoggingMixin):
             callback_requests=callback_requests,
         )
 
+    @span
     def start_new_processes(self):
         """Start more processors if we have enough slots and files to 
process."""
         # initialize cache to mutualize calls to Variable.get in DAGs
@@ -1157,14 +1208,21 @@ class DagFileProcessorManager(LoggingMixin):
 
             del self._callback_to_execute[file_path]
             Stats.incr("dag_processing.processes", tags={"file_path": 
file_path, "action": "start"})
-
+            span = Trace.get_current_span()
+            span.set_attribute("category", "processing")
             processor.start()
             self.log.debug("Started a process (PID: %s) to generate tasks for 
%s", processor.pid, file_path)
+            if span.is_recording():
+                span.add_event(
+                    name="dag_processing processor started",
+                    attributes={"file_path": file_path, "pid": processor.pid},
+                )
             self._processors[file_path] = processor
             self.waitables[processor.waitable_handle] = processor
 
             Stats.gauge("dag_processing.file_path_queue_size", 
len(self._file_path_queue))
 
+    @span
     def add_new_file_path_to_queue(self):
         for file_path in self.file_paths:
             if file_path not in self._file_stats:
@@ -1172,6 +1230,11 @@ class DagFileProcessorManager(LoggingMixin):
                 self.log.info("Adding new file %s to parsing queue", file_path)
                 self._file_stats[file_path] = 
DagFileProcessorManager.DEFAULT_FILE_STAT
                 self._file_path_queue.appendleft(file_path)
+                span = Trace.get_current_span()
+                if span.is_recording():
+                    span.add_event(
+                        name="adding new file to parsing queue", 
attributes={"file_path": file_path}
+                    )
 
     def prepare_file_path_queue(self):
         """
@@ -1285,6 +1348,13 @@ class DagFileProcessorManager(LoggingMixin):
                 # Deprecated; may be removed in a future Airflow release.
                 Stats.incr("dag_file_processor_timeouts")
                 processor.kill()
+                span = Trace.get_current_span()
+                span.set_attribute("category", "processing")
+                if span.is_recording():
+                    span.add_event(
+                        name="dag processing killed processor",
+                        attributes={"file_path": file_path, "action": 
"timeout"},
+                    )
 
                 # Clean up processor references
                 self.waitables.pop(processor.waitable_handle)
@@ -1345,12 +1415,16 @@ class DagFileProcessorManager(LoggingMixin):
         This is called once every time around the parsing "loop" - i.e. after
         all files have been parsed.
         """
-        parse_time = time.perf_counter() - self._parsing_start_time
-        Stats.gauge("dag_processing.total_parse_time", parse_time)
-        Stats.gauge("dagbag_size", sum(stat.num_dags for stat in 
self._file_stats.values()))
-        Stats.gauge(
-            "dag_processing.import_errors", sum(stat.import_errors for stat in 
self._file_stats.values())
-        )
+        with Trace.start_span(span_name="emit_metrics", 
component="DagFileProcessorManager") as span:
+            parse_time = time.perf_counter() - self._parsing_start_time
+            Stats.gauge("dag_processing.total_parse_time", parse_time)
+            Stats.gauge("dagbag_size", sum(stat.num_dags for stat in 
self._file_stats.values()))
+            Stats.gauge(
+                "dag_processing.import_errors", sum(stat.import_errors for 
stat in self._file_stats.values())
+            )
+            span.set_attribute("total_parse_time", parse_time)
+            span.set_attribute("dag_bag_size", sum(stat.num_dags for stat in 
self._file_stats.values()))
+            span.set_attribute("import_errors", sum(stat.import_errors for 
stat in self._file_stats.values()))
 
     @property
     def file_paths(self):
diff --git a/airflow/executors/base_executor.py 
b/airflow/executors/base_executor.py
index 0f94701174..098bb93215 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -32,6 +32,9 @@ from airflow.configuration import conf
 from airflow.exceptions import RemovedInAirflow3Warning
 from airflow.models import Log
 from airflow.stats import Stats
+from airflow.traces import NO_TRACE_ID
+from airflow.traces.tracer import Trace, gen_context, span
+from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import TaskInstanceState
 
@@ -224,6 +227,7 @@ class BaseExecutor(LoggingMixin):
         Executors should override this to perform gather statuses.
         """
 
+    @span
     def heartbeat(self) -> None:
         """Heartbeat sent to trigger new jobs."""
         if not self.parallelism:
@@ -241,6 +245,17 @@ class BaseExecutor(LoggingMixin):
         else:
             self.log.debug("%s open slots", open_slots)
 
+        span = Trace.get_current_span()
+        if span.is_recording():
+            span.add_event(
+                name="executor",
+                attributes={
+                    "executor.open_slots": open_slots,
+                    "executor.queued_tasks": num_queued_tasks,
+                    "executor.running_tasks": num_running_tasks,
+                },
+            )
+
         Stats.gauge(
             "executor.open_slots", value=open_slots, tags={"status": "open", 
"name": self.__class__.__name__}
         )
@@ -273,12 +288,14 @@ class BaseExecutor(LoggingMixin):
             reverse=True,
         )
 
+    @span
     def trigger_tasks(self, open_slots: int) -> None:
         """
         Initiate async execution of the queued tasks, up to the number of 
available slots.
 
         :param open_slots: Number of open slots
         """
+        span = Trace.get_current_span()
         sorted_queue = self.order_queued_tasks_by_priority()
         task_tuples = []
 
@@ -322,15 +339,40 @@ class BaseExecutor(LoggingMixin):
                 if key in self.attempts:
                     del self.attempts[key]
                 task_tuples.append((key, command, queue, ti.executor_config))
+                if span.is_recording():
+                    span.add_event(
+                        name="task to trigger",
+                        attributes={"command": str(command), "conf": 
str(ti.executor_config)},
+                    )
 
         if task_tuples:
             self._process_tasks(task_tuples)
 
+    @span
     def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
         for key, command, queue, executor_config in task_tuples:
-            del self.queued_tasks[key]
-            self.execute_async(key=key, command=command, queue=queue, 
executor_config=executor_config)
-            self.running.add(key)
+            task_instance = self.queued_tasks[key][3]  # TaskInstance in 
fourth element
+            trace_id = int(gen_trace_id(task_instance.dag_run, as_int=True))
+            span_id = int(gen_span_id_from_ti_key(key, as_int=True))
+            links = [{"trace_id": trace_id, "span_id": span_id}]
+
+            # assuming that the span_id will very likely be unique inside the 
trace
+            with Trace.start_span(
+                span_name=f"{key.dag_id}.{key.task_id}",
+                component="BaseExecutor",
+                span_id=span_id,
+                links=links,
+            ) as span:
+                span.set_attribute("dag_id", key.dag_id)
+                span.set_attribute("run_id", key.run_id)
+                span.set_attribute("task_id", key.task_id)
+                span.set_attribute("try_number", key.try_number)
+                span.set_attribute("command", str(command))
+                span.set_attribute("queue", str(queue))
+                span.set_attribute("executor_config", str(executor_config))
+                del self.queued_tasks[key]
+                self.execute_async(key=key, command=command, queue=queue, 
executor_config=executor_config)
+                self.running.add(key)
 
     def change_state(
         self, key: TaskInstanceKey, state: TaskInstanceState, info=None, 
remove_running=True
@@ -358,6 +400,20 @@ class BaseExecutor(LoggingMixin):
         :param info: Executor information for the task instance
         :param key: Unique key for the task instance
         """
+        trace_id = Trace.get_current_span().get_span_context().trace_id
+        if trace_id != NO_TRACE_ID:
+            span_id = int(gen_span_id_from_ti_key(key, as_int=True))
+            with Trace.start_span(
+                span_name="fail",
+                component="BaseExecutor",
+                parent_sc=gen_context(trace_id=trace_id, span_id=span_id),
+            ) as span:
+                span.set_attribute("dag_id", key.dag_id)
+                span.set_attribute("run_id", key.run_id)
+                span.set_attribute("task_id", key.task_id)
+                span.set_attribute("try_number", key.try_number)
+                span.set_attribute("error", True)
+
         self.change_state(key, TaskInstanceState.FAILED, info)
 
     def success(self, key: TaskInstanceKey, info=None) -> None:
@@ -367,6 +423,19 @@ class BaseExecutor(LoggingMixin):
         :param info: Executor information for the task instance
         :param key: Unique key for the task instance
         """
+        trace_id = Trace.get_current_span().get_span_context().trace_id
+        if trace_id != NO_TRACE_ID:
+            span_id = int(gen_span_id_from_ti_key(key, as_int=True))
+            with Trace.start_span(
+                span_name="success",
+                component="BaseExecutor",
+                parent_sc=gen_context(trace_id=trace_id, span_id=span_id),
+            ) as span:
+                span.set_attribute("dag_id", key.dag_id)
+                span.set_attribute("run_id", key.run_id)
+                span.set_attribute("task_id", key.task_id)
+                span.set_attribute("try_number", key.try_number - 1)
+
         self.change_state(key, TaskInstanceState.SUCCESS, info)
 
     def queued(self, key: TaskInstanceKey, info=None) -> None:
diff --git a/airflow/executors/local_executor.py 
b/airflow/executors/local_executor.py
index 3b2670e755..90cedf7dbd 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -39,6 +39,7 @@ from setproctitle import getproctitle, setproctitle
 from airflow import settings
 from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import PARALLELISM, BaseExecutor
+from airflow.traces.tracer import Trace, span
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import TaskInstanceState
 
@@ -77,6 +78,7 @@ class LocalWorkerBase(Process, LoggingMixin):
         setproctitle("airflow worker -- LocalExecutor")
         return super().run()
 
+    @span
     def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
         """
         Execute command received and stores result state in queue.
@@ -98,6 +100,7 @@ class LocalWorkerBase(Process, LoggingMixin):
         # Remove the command since the worker is done executing the task
         setproctitle("airflow worker -- LocalExecutor")
 
+    @span
     def _execute_work_in_subprocess(self, command: CommandType) -> 
TaskInstanceState:
         try:
             subprocess.check_call(command, close_fds=True)
@@ -106,6 +109,7 @@ class LocalWorkerBase(Process, LoggingMixin):
             self.log.error("Failed to execute task %s.", e)
             return TaskInstanceState.FAILED
 
+    @span
     def _execute_work_in_fork(self, command: CommandType) -> TaskInstanceState:
         pid = os.fork()
         if pid:
@@ -165,6 +169,7 @@ class LocalWorker(LocalWorkerBase):
         self.key: TaskInstanceKey = key
         self.command: CommandType = command
 
+    @span
     def do_work(self) -> None:
         self.execute_work(key=self.key, command=self.command)
 
@@ -184,6 +189,7 @@ class QueuedLocalWorker(LocalWorkerBase):
         super().__init__(result_queue=result_queue)
         self.task_queue = task_queue
 
+    @span
     def do_work(self) -> None:
         while True:
             try:
@@ -244,6 +250,7 @@ class LocalExecutor(BaseExecutor):
             self.executor.workers_used = 0
             self.executor.workers_active = 0
 
+        @span
         def execute_async(
             self,
             key: TaskInstanceKey,
@@ -262,6 +269,14 @@ class LocalExecutor(BaseExecutor):
             if TYPE_CHECKING:
                 assert self.executor.result_queue
 
+            span = Trace.get_current_span()
+            if span.is_recording():
+                span.set_attribute("dag_id", key.dag_id)
+                span.set_attribute("run_id", key.run_id)
+                span.set_attribute("task_id", key.task_id)
+                span.set_attribute("try_number", key.try_number - 1)
+                span.set_attribute("commands_to_run", str(command))
+
             local_worker = LocalWorker(self.executor.result_queue, key=key, 
command=command)
             self.executor.workers_used += 1
             self.executor.workers_active += 1
@@ -311,6 +326,7 @@ class LocalExecutor(BaseExecutor):
             for worker in self.executor.workers:
                 worker.start()
 
+        @span
         def execute_async(
             self,
             key: TaskInstanceKey,
@@ -372,6 +388,7 @@ class LocalExecutor(BaseExecutor):
 
         self.impl.start()
 
+    @span
     def execute_async(
         self,
         key: TaskInstanceKey,
diff --git a/airflow/executors/sequential_executor.py 
b/airflow/executors/sequential_executor.py
index 41b8ae9ddc..1b145892eb 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -29,6 +29,7 @@ import subprocess
 from typing import TYPE_CHECKING, Any
 
 from airflow.executors.base_executor import BaseExecutor
+from airflow.traces.tracer import Trace, span
 
 if TYPE_CHECKING:
     from airflow.executors.base_executor import CommandType
@@ -59,6 +60,7 @@ class SequentialExecutor(BaseExecutor):
         super().__init__()
         self.commands_to_run = []
 
+    @span
     def execute_async(
         self,
         key: TaskInstanceKey,
@@ -69,6 +71,14 @@ class SequentialExecutor(BaseExecutor):
         self.validate_airflow_tasks_run_command(command)
         self.commands_to_run.append((key, command))
 
+        span = Trace.get_current_span()
+        if span.is_recording():
+            span.set_attribute("dag_id", key.dag_id)
+            span.set_attribute("run_id", key.run_id)
+            span.set_attribute("task_id", key.task_id)
+            span.set_attribute("try_number", key.try_number - 1)
+            span.set_attribute("commands_to_run", str(self.commands_to_run))
+
     def sync(self) -> None:
         for key, command in self.commands_to_run:
             self.log.info("Executing command: %s", command)
diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index 4273f1d334..9384821807 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -34,6 +34,7 @@ from airflow.listeners.listener import get_listener_manager
 from airflow.models.base import ID_LEN, Base
 from airflow.serialization.pydantic.job import JobPydantic
 from airflow.stats import Stats
+from airflow.traces.tracer import Trace, span
 from airflow.utils import timezone
 from airflow.utils.helpers import convert_camel_to_snake
 from airflow.utils.log.logging_mixin import LoggingMixin
@@ -199,52 +200,62 @@ class Job(Base, LoggingMixin):
         :param session to use for saving the job
         """
         previous_heartbeat = self.latest_heartbeat
-
-        try:
-            # This will cause it to load from the db
-            self._merge_from(Job._fetch_from_db(self, session))
-            previous_heartbeat = self.latest_heartbeat
-
-            if self.state == JobState.RESTARTING:
-                self.kill()
-
-            # Figure out how long to sleep for
-            sleep_for = 0
-            if self.latest_heartbeat:
-                seconds_remaining = (
-                    self.heartrate - (timezone.utcnow() - 
self.latest_heartbeat).total_seconds()
-                )
-                sleep_for = max(0, seconds_remaining)
-            sleep(sleep_for)
-
-            job = Job._update_heartbeat(job=self, session=session)
-            self._merge_from(job)
-            time_since_last_heartbeat = (timezone.utcnow() - 
previous_heartbeat).total_seconds()
-            health_check_threshold_value = 
health_check_threshold(self.job_type, self.heartrate)
-            if time_since_last_heartbeat > health_check_threshold_value:
-                self.log.info("Heartbeat recovered after %.2f seconds", 
time_since_last_heartbeat)
-            # At this point, the DB has updated.
-            previous_heartbeat = self.latest_heartbeat
-
-            heartbeat_callback(session)
-            self.log.debug("[heartbeat]")
-            self.heartbeat_failed = False
-        except OperationalError:
-            Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)
-            if not self.heartbeat_failed:
-                self.log.exception("%s heartbeat failed with error", 
self.__class__.__name__)
-                self.heartbeat_failed = True
-            if self.is_alive():
-                self.log.error(
-                    "%s heartbeat failed with error. Scheduler may go into 
unhealthy state",
-                    self.__class__.__name__,
-                )
-            else:
-                self.log.error(
-                    "%s heartbeat failed with error. Scheduler is in unhealthy 
state", self.__class__.__name__
-                )
-            # We didn't manage to heartbeat, so make sure that the timestamp 
isn't updated
-            self.latest_heartbeat = previous_heartbeat
+        with Trace.start_span(span_name="heartbeat", component="Job") as span:
+            try:
+                span.set_attribute("heartbeat", str(self.latest_heartbeat))
+                # This will cause it to load from the db
+                self._merge_from(Job._fetch_from_db(self, session))
+                previous_heartbeat = self.latest_heartbeat
+
+                if self.state == JobState.RESTARTING:
+                    self.kill()
+
+                # Figure out how long to sleep for
+                sleep_for = 0
+                if self.latest_heartbeat:
+                    seconds_remaining = (
+                        self.heartrate - (timezone.utcnow() - 
self.latest_heartbeat).total_seconds()
+                    )
+                    sleep_for = max(0, seconds_remaining)
+                if span.is_recording():
+                    span.add_event(name="sleep", attributes={"sleep_for": 
sleep_for})
+                sleep(sleep_for)
+
+                job = Job._update_heartbeat(job=self, session=session)
+                self._merge_from(job)
+                time_since_last_heartbeat = (timezone.utcnow() - 
previous_heartbeat).total_seconds()
+                health_check_threshold_value = 
health_check_threshold(self.job_type, self.heartrate)
+                if time_since_last_heartbeat > health_check_threshold_value:
+                    self.log.info("Heartbeat recovered after %.2f seconds", 
time_since_last_heartbeat)
+                # At this point, the DB has updated.
+                previous_heartbeat = self.latest_heartbeat
+
+                heartbeat_callback(session)
+                self.log.debug("[heartbeat]")
+                self.heartbeat_failed = False
+            except OperationalError:
+                Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)
+                if not self.heartbeat_failed:
+                    self.log.exception("%s heartbeat failed with error", 
self.__class__.__name__)
+                    self.heartbeat_failed = True
+                    msg = f"{self.__class__.__name__} heartbeat got an 
exception"
+                    if span.is_recording():
+                        span.add_event(name="error", attributes={"message": 
msg})
+                if self.is_alive():
+                    self.log.error(
+                        "%s heartbeat failed with error. Scheduler may go into 
unhealthy state",
+                        self.__class__.__name__,
+                    )
+                    msg = f"{self.__class__.__name__} heartbeat failed with 
error. Scheduler may go into unhealthy state"
+                    if span.is_recording():
+                        span.add_event(name="error", attributes={"message": 
msg})
+                else:
+                    msg = f"{self.__class__.__name__} heartbeat failed with 
error. Scheduler is in unhealthy state"
+                    self.log.error(msg)
+                    if span.is_recording():
+                        span.add_event(name="error", attributes={"message": 
msg})
+                # We didn't manage to heartbeat, so make sure that the 
timestamp isn't updated
+                self.latest_heartbeat = previous_heartbeat
 
     @provide_session
     def prepare_for_execution(self, session: Session = NEW_SESSION):
@@ -448,6 +459,7 @@ def execute_job(job: Job, execute_callable: Callable[[], 
int | None]) -> int | N
     return ret
 
 
+@span
 def perform_heartbeat(
     job: Job, heartbeat_callback: Callable[[Session], None], 
only_if_necessary: bool
 ) -> None:
diff --git a/airflow/jobs/local_task_job_runner.py 
b/airflow/jobs/local_task_job_runner.py
index 96e36bcfe7..a6a1f0ac8f 100644
--- a/airflow/jobs/local_task_job_runner.py
+++ b/airflow/jobs/local_task_job_runner.py
@@ -28,6 +28,7 @@ from airflow.jobs.base_job_runner import BaseJobRunner
 from airflow.jobs.job import perform_heartbeat
 from airflow.models.taskinstance import TaskReturnCode
 from airflow.stats import Stats
+from airflow.traces.tracer import Trace
 from airflow.utils import timezone
 from airflow.utils.log.file_task_handler import _set_task_deferred_context_var
 from airflow.utils.log.logging_mixin import LoggingMixin
@@ -184,39 +185,55 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
             # If LocalTaskJob receives SIGTERM, LocalTaskJob passes SIGTERM to 
_run_raw_task
             # If the state of task_instance is changed, LocalTaskJob sends 
SIGTERM to _run_raw_task
             while not self.terminating:
-                # Monitor the task to see if it's done. Wait in a syscall
-                # (`os.wait`) for as long as possible so we notice the
-                # subprocess finishing as quick as we can
-                max_wait_time = max(
-                    0,  # Make sure this value is never negative,
-                    min(
-                        (
-                            heartbeat_time_limit
-                            - (timezone.utcnow() - 
self.job.latest_heartbeat).total_seconds() * 0.75
+                with Trace.start_span(
+                    span_name="local_task_job_loop", 
component="LocalTaskJobRunner"
+                ) as span:
+                    # Monitor the task to see if it's done. Wait in a syscall
+                    # (`os.wait`) for as long as possible so we notice the
+                    # subprocess finishing as quick as we can
+                    max_wait_time = max(
+                        0,  # Make sure this value is never negative,
+                        min(
+                            (
+                                heartbeat_time_limit
+                                - (timezone.utcnow() - 
self.job.latest_heartbeat).total_seconds() * 0.75
+                            ),
+                            self.job.heartrate if self.job.heartrate is not 
None else heartbeat_time_limit,
                         ),
-                        self.job.heartrate if self.job.heartrate is not None 
else heartbeat_time_limit,
-                    ),
-                )
-                return_code = 
self.task_runner.return_code(timeout=max_wait_time)
-                if return_code is not None:
-                    self.handle_task_exit(return_code)
-                    return return_code
-
-                perform_heartbeat(
-                    job=self.job, heartbeat_callback=self.heartbeat_callback, 
only_if_necessary=False
-                )
-
-                # If it's been too long since we've heartbeat, then it's 
possible that
-                # the scheduler rescheduled this task, so kill launched 
processes.
-                # This can only really happen if the worker can't read the DB 
for a long time
-                time_since_last_heartbeat = (timezone.utcnow() - 
self.job.latest_heartbeat).total_seconds()
-                if time_since_last_heartbeat > heartbeat_time_limit:
-                    Stats.incr("local_task_job_prolonged_heartbeat_failure", 
1, 1)
-                    self.log.error("Heartbeat time limit exceeded!")
-                    raise AirflowException(
-                        f"Time since last 
heartbeat({time_since_last_heartbeat:.2f}s) exceeded limit "
-                        f"({heartbeat_time_limit}s)."
                     )
+                    return_code = 
self.task_runner.return_code(timeout=max_wait_time)
+                    if return_code is not None:
+                        self.handle_task_exit(return_code)
+                        return return_code
+
+                    if span.is_recording():
+                        span.add_event(name="perform_heartbeat")
+                    perform_heartbeat(
+                        job=self.job, 
heartbeat_callback=self.heartbeat_callback, only_if_necessary=False
+                    )
+
+                    # If it's been too long since we've heartbeat, then it's 
possible that
+                    # the scheduler rescheduled this task, so kill launched 
processes.
+                    # This can only really happen if the worker can't read the 
DB for a long time
+                    time_since_last_heartbeat = (
+                        timezone.utcnow() - self.job.latest_heartbeat
+                    ).total_seconds()
+                    if time_since_last_heartbeat > heartbeat_time_limit:
+                        
Stats.incr("local_task_job_prolonged_heartbeat_failure", 1, 1)
+                        self.log.error("Heartbeat time limit exceeded!")
+                        if span.is_recording():
+                            span.add_event(
+                                name="error",
+                                attributes={
+                                    "message": "Heartbeat time limit exceeded",
+                                    "heartbeat_time_limit(s)": 
heartbeat_time_limit,
+                                    "time_since_last_heartbeat(s)": 
time_since_last_heartbeat,
+                                },
+                            )
+                        raise AirflowException(
+                            f"Time since last 
heartbeat({time_since_last_heartbeat:.2f}s) exceeded limit "
+                            f"({heartbeat_time_limit}s)."
+                        )
             return return_code
         finally:
             # Print a marker for log grouping of details before task execution
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index f657cbd5f6..883cd05d2b 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -60,7 +60,10 @@ from airflow.models.taskinstance import SimpleTaskInstance, 
TaskInstance
 from airflow.stats import Stats
 from airflow.ti_deps.dependencies_states import EXECUTION_STATES
 from airflow.timetables.simple import DatasetTriggeredTimetable
+from airflow.traces import utils as trace_utils
+from airflow.traces.tracer import Trace, span
 from airflow.utils import timezone
+from airflow.utils.dates import datetime_to_nano
 from airflow.utils.event_scheduler import EventScheduler
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.retries import MAX_DB_RETRIES, retry_db_transaction, 
run_with_db_retries
@@ -815,6 +818,60 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 ti.pid,
             )
 
+            with Trace.start_span_from_taskinstance(ti=ti) as span:
+                span.set_attribute("category", "scheduler")
+                span.set_attribute("task_id", ti.task_id)
+                span.set_attribute("dag_id", ti.dag_id)
+                span.set_attribute("state", ti.state)
+                if ti.state == TaskInstanceState.FAILED:
+                    span.set_attribute("error", True)
+                span.set_attribute("start_date", str(ti.start_date))
+                span.set_attribute("end_date", str(ti.end_date))
+                span.set_attribute("duration", ti.duration)
+                span.set_attribute("executor_config", str(ti.executor_config))
+                span.set_attribute("execution_date", str(ti.execution_date))
+                span.set_attribute("hostname", ti.hostname)
+                span.set_attribute("log_url", ti.log_url)
+                span.set_attribute("operator", str(ti.operator))
+                span.set_attribute("try_number", ti.try_number - 1)
+                span.set_attribute("executor_state", state)
+                span.set_attribute("job_id", ti.job_id)
+                span.set_attribute("pool", ti.pool)
+                span.set_attribute("queue", ti.queue)
+                span.set_attribute("priority_weight", ti.priority_weight)
+                span.set_attribute("queued_dttm", str(ti.queued_dttm))
+                span.set_attribute("ququed_by_job_id", ti.queued_by_job_id)
+                span.set_attribute("pid", ti.pid)
+                if span.is_recording():
+                    span.add_event(name="queued", 
timestamp=datetime_to_nano(ti.queued_dttm))
+                    span.add_event(name="started", 
timestamp=datetime_to_nano(ti.start_date))
+                    span.add_event(name="ended", 
timestamp=datetime_to_nano(ti.end_date))
+                if conf.has_option("traces", "otel_task_log_event") and 
conf.getboolean(
+                    "traces", "otel_task_log_event"
+                ):
+                    from airflow.utils.log.log_reader import TaskLogReader
+
+                    task_log_reader = TaskLogReader()
+                    if task_log_reader.supports_read:
+                        metadata: dict[str, Any] = {}
+                        logs, metadata = task_log_reader.read_log_chunks(ti, 
ti.try_number, metadata)
+                        if ti.hostname in dict(logs[0]):
+                            message = 
str(dict(logs[0])[ti.hostname]).replace("\\n", "\n")
+                            while metadata["end_of_log"] is False:
+                                logs, metadata = 
task_log_reader.read_log_chunks(
+                                    ti, ti.try_number - 1, metadata
+                                )
+                                if ti.hostname in dict(logs[0]):
+                                    message = message + 
str(dict(logs[0])[ti.hostname]).replace("\\n", "\n")
+                            if span.is_recording():
+                                span.add_event(
+                                    name="task_log",
+                                    attributes={
+                                        "message": message,
+                                        "metadata": str(metadata),
+                                    },
+                                )
+
             # There are two scenarios why the same TI with the same try_number 
is queued
             # after executor is finished with it:
             # 1) the TI was killed externally and it had no time to mark 
itself failed
@@ -1044,13 +1101,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             )
 
         for loop_count in itertools.count(start=1):
-            with Stats.timer("scheduler.scheduler_loop_duration") as timer:
-                if self.using_sqlite and self.processor_agent:
-                    self.processor_agent.run_single_parsing_loop()
-                    # For the sqlite case w/ 1 thread, wait until the processor
-                    # is finished to avoid concurrent access to the DB.
-                    self.log.debug("Waiting for processors to finish since 
we're using sqlite")
-                    self.processor_agent.wait_until_finished()
+            with Trace.start_span(span_name="scheduler_job_loop", 
component="SchedulerJobRunner") as span:
+                span.set_attribute("category", "scheduler")
+                span.set_attribute("loop_count", loop_count)
+                with Stats.timer("scheduler.scheduler_loop_duration") as timer:
+                    if self.using_sqlite and self.processor_agent:
+                        self.processor_agent.run_single_parsing_loop()
+                        # For the sqlite case w/ 1 thread, wait until the 
processor
+                        # is finished to avoid concurrent access to the DB.
+                        self.log.debug("Waiting for processors to finish since 
we're using sqlite")
+                        self.processor_agent.wait_until_finished()
 
                 with create_session() as session:
                     # This will schedule for as many executors as possible.
@@ -1079,16 +1139,23 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 if self.processor_agent:
                     self.processor_agent.heartbeat()
 
-                # Heartbeat the scheduler periodically
-                perform_heartbeat(
-                    job=self.job, heartbeat_callback=self.heartbeat_callback, 
only_if_necessary=True
-                )
+                    # Heartbeat the scheduler periodically
+                    perform_heartbeat(
+                        job=self.job, 
heartbeat_callback=self.heartbeat_callback, only_if_necessary=True
+                    )
 
-                # Run any pending timed events
-                next_event = timers.run(blocking=False)
-                self.log.debug("Next timed event is in %f", next_event)
+                    # Run any pending timed events
+                    next_event = timers.run(blocking=False)
+                    self.log.debug("Next timed event is in %f", next_event)
 
             self.log.debug("Ran scheduling loop in %.2f seconds", 
timer.duration)
+            if span.is_recording():
+                span.add_event(
+                    name="Ran scheduling loop",
+                    attributes={
+                        "duration in seconds": timer.duration,
+                    },
+                )
 
             if not is_unit_test and not num_queued_tis and not 
num_finished_events:
                 # If the scheduler is doing things, don't sleep. This means 
when there is work to do, the
@@ -1102,6 +1169,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     self.num_runs,
                     loop_count,
                 )
+                if span.is_recording():
+                    span.add_event("Exiting scheduler loop as requested number 
of runs has been reached")
                 break
             if self.processor_agent and self.processor_agent.done:
                 self.log.info(
@@ -1110,6 +1179,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     self.num_times_parse_dags,
                     loop_count,
                 )
+                if span.is_recording():
+                    span.add_event("Exiting scheduler loop as requested DAG 
parse count has been reached")
                 break
 
     def _do_scheduling(self, session: Session) -> int:
@@ -1229,6 +1300,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         guard.commit()
         # END: create dagruns
 
+    @span
     def _create_dag_runs(self, dag_models: Collection[DagModel], session: 
Session) -> None:
         """Create a DAG run and update the dag_model to control if/when the 
next DAGRun should be created."""
         # Bulk Fetch DagRuns with dag_id and execution_date same
@@ -1436,6 +1508,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             return False
         return True
 
+    @span
     def _start_queued_dagruns(self, session: Session) -> None:
         """Find DagRuns in queued state and decide moving them to running 
state."""
         # added all() to save runtime, otherwise query is executed more than 
once
@@ -1445,7 +1518,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs), 
only_running=True, session=session),
         )
 
+        @span
         def _update_state(dag: DAG, dag_run: DagRun):
+            __span = Trace.get_current_span()
+            __span.set_attribute("state", str(DagRunState.RUNNING))
+            __span.set_attribute("run_id", dag_run.run_id)
+            __span.set_attribute("type", dag_run.run_type)
+            __span.set_attribute("dag_id", dag_run.dag_id)
+
             dag_run.state = DagRunState.RUNNING
             dag_run.start_date = timezone.utcnow()
             if dag.timetable.periodic and not dag_run.external_trigger and 
dag_run.clear_number < 1:
@@ -1465,12 +1545,18 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     schedule_delay,
                     tags={"dag_id": dag.dag_id},
                 )
+                if __span.is_recording():
+                    __span.add_event(
+                        name="schedule_delay",
+                        attributes={"dag_id": dag.dag_id, "schedule_delay": 
str(schedule_delay)},
+                    )
 
         # cache saves time during scheduling of many dag_runs for same dag
         cached_get_dag: Callable[[str], DAG | None] = lru_cache()(
             partial(self.dagbag.get_dag, session=session)
         )
 
+        _span = Trace.get_current_span()
         for dag_run in dag_runs:
             dag = dag_run.dag = cached_get_dag(dag_run.dag_id)
 
@@ -1487,6 +1573,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     dag_run.execution_date,
                 )
             else:
+                if _span.is_recording():
+                    _span.add_event(
+                        name="dag_run",
+                        attributes={
+                            "run_id": dag_run.run_id,
+                            "dag_id": dag_run.dag_id,
+                            "conf": str(dag_run.conf),
+                        },
+                    )
                 active_runs_of_dags[dag_run.dag_id] += 1
                 _update_state(dag, dag_run)
                 dag_run.notify_dagrun_state_changed()
@@ -1514,70 +1609,101 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         :param dag_run: The DagRun to schedule
         :return: Callback that needs to be executed
         """
-        callback: DagCallbackRequest | None = None
+        trace_id = int(trace_utils.gen_trace_id(dag_run=dag_run, as_int=True))
+        span_id = int(trace_utils.gen_dag_span_id(dag_run=dag_run, 
as_int=True))
+        links = [{"trace_id": trace_id, "span_id": span_id}]
+
+        with Trace.start_span(
+            span_name="_schedule_dag_run", component="SchedulerJobRunner", 
links=links
+        ) as span:
+            span.set_attribute("dag_id", dag_run.dag_id)
+            span.set_attribute("run_id", dag_run.run_id)
+            span.set_attribute("run_type", dag_run.run_type)
+            callback: DagCallbackRequest | None = None
+
+            dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, 
session=session)
+            dag_model = DM.get_dagmodel(dag_run.dag_id, session)
+
+            if not dag or not dag_model:
+                self.log.error("Couldn't find DAG %s in DAG bag or database!", 
dag_run.dag_id)
+                return callback
+
+            if (
+                dag_run.start_date
+                and dag.dagrun_timeout
+                and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
+            ):
+                dag_run.set_state(DagRunState.FAILED)
+                unfinished_task_instances = session.scalars(
+                    select(TI)
+                    .where(TI.dag_id == dag_run.dag_id)
+                    .where(TI.run_id == dag_run.run_id)
+                    .where(TI.state.in_(State.unfinished))
+                )
+                for task_instance in unfinished_task_instances:
+                    task_instance.state = TaskInstanceState.SKIPPED
+                    session.merge(task_instance)
+                session.flush()
+                self.log.info("Run %s of %s has timed-out", dag_run.run_id, 
dag_run.dag_id)
+
+                if self._should_update_dag_next_dagruns(
+                    dag, dag_model, last_dag_run=dag_run, session=session
+                ):
+                    dag_model.calculate_dagrun_date_fields(dag, 
dag.get_run_data_interval(dag_run))
+
+                callback_to_execute = DagCallbackRequest(
+                    full_filepath=dag.fileloc,
+                    dag_id=dag.dag_id,
+                    run_id=dag_run.run_id,
+                    is_failure_callback=True,
+                    processor_subdir=dag_model.processor_subdir,
+                    msg="timed_out",
+                )
 
-        dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, 
session=session)
-        dag_model = DM.get_dagmodel(dag_run.dag_id, session)
+                dag_run.notify_dagrun_state_changed()
+                duration = dag_run.end_date - dag_run.start_date
+                Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}", 
duration)
+                Stats.timing("dagrun.duration.failed", duration, 
tags={"dag_id": dag_run.dag_id})
+                span.set_attribute("error", True)
+                if span.is_recording():
+                    span.add_event(
+                        name="error",
+                        attributes={
+                            "message": f"Run {dag_run.run_id} of 
{dag_run.dag_id} has timed-out",
+                            "duration": str(duration),
+                        },
+                    )
+                return callback_to_execute
 
-        if not dag or not dag_model:
-            self.log.error("Couldn't find DAG %s in DAG bag or database!", 
dag_run.dag_id)
-            return callback
+            if dag_run.execution_date > timezone.utcnow() and not 
dag.allow_future_exec_dates:
+                self.log.error("Execution date is in future: %s", 
dag_run.execution_date)
+                return callback
 
-        if (
-            dag_run.start_date
-            and dag.dagrun_timeout
-            and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
-        ):
-            dag_run.set_state(DagRunState.FAILED)
-            unfinished_task_instances = session.scalars(
-                select(TI)
-                .where(TI.dag_id == dag_run.dag_id)
-                .where(TI.run_id == dag_run.run_id)
-                .where(TI.state.in_(State.unfinished))
-            )
-            for task_instance in unfinished_task_instances:
-                task_instance.state = TaskInstanceState.SKIPPED
-                session.merge(task_instance)
-            session.flush()
-            self.log.info("Run %s of %s has timed-out", dag_run.run_id, 
dag_run.dag_id)
+            if not self._verify_integrity_if_dag_changed(dag_run=dag_run, 
session=session):
+                self.log.warning(
+                    "The DAG disappeared before verifying integrity: %s. 
Skipping.", dag_run.dag_id
+                )
+                return callback
+            # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something 
else?
+            schedulable_tis, callback_to_run = 
dag_run.update_state(session=session, execute_callbacks=False)
 
             if self._should_update_dag_next_dagruns(dag, dag_model, 
last_dag_run=dag_run, session=session):
                 dag_model.calculate_dagrun_date_fields(dag, 
dag.get_run_data_interval(dag_run))
+            # This will do one query per dag run. We "could" build up a complex
+            # query to update all the TIs across all the execution dates and 
dag
+            # IDs in a single query, but it turns out that can be _very very 
slow_
+            # see #11147/commit ee90807ac for more details
+            if span.is_recording():
+                span.add_event(
+                    name="schedule_tis",
+                    attributes={
+                        "message": "dag_run scheduling its tis",
+                        "schedulable_tis": [_ti.task_id for _ti in 
schedulable_tis],
+                    },
+                )
+            dag_run.schedule_tis(schedulable_tis, session, 
max_tis_per_query=self.job.max_tis_per_query)
 
-            callback_to_execute = DagCallbackRequest(
-                full_filepath=dag.fileloc,
-                dag_id=dag.dag_id,
-                run_id=dag_run.run_id,
-                is_failure_callback=True,
-                processor_subdir=dag_model.processor_subdir,
-                msg="timed_out",
-            )
-
-            dag_run.notify_dagrun_state_changed()
-            duration = dag_run.end_date - dag_run.start_date
-            Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}", duration)
-            Stats.timing("dagrun.duration.failed", duration, tags={"dag_id": 
dag_run.dag_id})
-            return callback_to_execute
-
-        if dag_run.execution_date > timezone.utcnow() and not 
dag.allow_future_exec_dates:
-            self.log.error("Execution date is in future: %s", 
dag_run.execution_date)
-            return callback
-
-        if not self._verify_integrity_if_dag_changed(dag_run=dag_run, 
session=session):
-            self.log.warning("The DAG disappeared before verifying integrity: 
%s. Skipping.", dag_run.dag_id)
-            return callback
-        # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
-        schedulable_tis, callback_to_run = 
dag_run.update_state(session=session, execute_callbacks=False)
-
-        if self._should_update_dag_next_dagruns(dag, dag_model, 
last_dag_run=dag_run, session=session):
-            dag_model.calculate_dagrun_date_fields(dag, 
dag.get_run_data_interval(dag_run))
-        # This will do one query per dag run. We "could" build up a complex
-        # query to update all the TIs across all the execution dates and dag
-        # IDs in a single query, but it turns out that can be _very very slow_
-        # see #11147/commit ee90807ac for more details
-        dag_run.schedule_tis(schedulable_tis, session, 
max_tis_per_query=self.job.max_tis_per_query)
-
-        return callback_to_run
+            return callback_to_run
 
     def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: 
Session) -> bool:
         """
@@ -1681,20 +1807,27 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
     def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
         from airflow.models.pool import Pool
 
-        pools = Pool.slots_stats(session=session)
-        for pool_name, slot_stats in pools.items():
-            Stats.gauge(f"pool.open_slots.{pool_name}", slot_stats["open"])
-            Stats.gauge(f"pool.queued_slots.{pool_name}", slot_stats["queued"])
-            Stats.gauge(f"pool.running_slots.{pool_name}", 
slot_stats["running"])
-            Stats.gauge(f"pool.deferred_slots.{pool_name}", 
slot_stats["deferred"])
-            Stats.gauge(f"pool.scheduled_slots.{pool_name}", 
slot_stats["scheduled"])
-
-            # Same metrics with tagging
-            Stats.gauge("pool.open_slots", slot_stats["open"], 
tags={"pool_name": pool_name})
-            Stats.gauge("pool.queued_slots", slot_stats["queued"], 
tags={"pool_name": pool_name})
-            Stats.gauge("pool.running_slots", slot_stats["running"], 
tags={"pool_name": pool_name})
-            Stats.gauge("pool.deferred_slots", slot_stats["deferred"], 
tags={"pool_name": pool_name})
-            Stats.gauge("pool.scheduled_slots", slot_stats["scheduled"], 
tags={"pool_name": pool_name})
+        with Trace.start_span(span_name="emit_pool_metrics", 
component="SchedulerJobRunner") as span:
+            pools = Pool.slots_stats(session=session)
+            for pool_name, slot_stats in pools.items():
+                Stats.gauge(f"pool.open_slots.{pool_name}", slot_stats["open"])
+                Stats.gauge(f"pool.queued_slots.{pool_name}", 
slot_stats["queued"])
+                Stats.gauge(f"pool.running_slots.{pool_name}", 
slot_stats["running"])
+                Stats.gauge(f"pool.deferred_slots.{pool_name}", 
slot_stats["deferred"])
+                Stats.gauge(f"pool.scheduled_slots.{pool_name}", 
slot_stats["scheduled"])
+
+                # Same metrics with tagging
+                Stats.gauge("pool.open_slots", slot_stats["open"], 
tags={"pool_name": pool_name})
+                Stats.gauge("pool.queued_slots", slot_stats["queued"], 
tags={"pool_name": pool_name})
+                Stats.gauge("pool.running_slots", slot_stats["running"], 
tags={"pool_name": pool_name})
+                Stats.gauge("pool.deferred_slots", slot_stats["deferred"], 
tags={"pool_name": pool_name})
+                Stats.gauge("pool.scheduled_slots", slot_stats["scheduled"], 
tags={"pool_name": pool_name})
+
+                span.set_attribute("category", "scheduler")
+                span.set_attribute(f"pool.open_slots.{pool_name}", 
slot_stats["open"])
+                span.set_attribute(f"pool.queued_slots.{pool_name}", 
slot_stats["queued"])
+                span.set_attribute(f"pool.running_slots.{pool_name}", 
slot_stats["running"])
+                span.set_attribute(f"pool.deferred_slots.{pool_name}", 
slot_stats["deferred"])
 
     @provide_session
     def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) -> 
int:
diff --git a/airflow/jobs/triggerer_job_runner.py 
b/airflow/jobs/triggerer_job_runner.py
index e1736ae8e5..080323a1d1 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -37,6 +37,7 @@ from airflow.jobs.base_job_runner import BaseJobRunner
 from airflow.jobs.job import perform_heartbeat
 from airflow.models.trigger import Trigger
 from airflow.stats import Stats
+from airflow.traces.tracer import Trace, span
 from airflow.triggers.base import TriggerEvent
 from airflow.typing_compat import TypedDict
 from airflow.utils import timezone
@@ -362,26 +363,43 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
             if not self.trigger_runner.is_alive():
                 self.log.error("Trigger runner thread has died! Exiting.")
                 break
-            # Clean out unused triggers
-            Trigger.clean_unused()
-            # Load/delete triggers
-            self.load_triggers()
-            # Handle events
-            self.handle_events()
-            # Handle failed triggers
-            self.handle_failed_triggers()
-            perform_heartbeat(self.job, 
heartbeat_callback=self.heartbeat_callback, only_if_necessary=True)
-            # Collect stats
-            self.emit_metrics()
+            with Trace.start_span(span_name="triggerer_job_loop", 
component="TriggererJobRunner") as span:
+                # Clean out unused triggers
+                if span.is_recording():
+                    span.add_event(name="Trigger.clean_unused")
+                Trigger.clean_unused()
+                # Load/delete triggers
+                if span.is_recording():
+                    span.add_event(name="load_triggers")
+                self.load_triggers()
+                # Handle events
+                if span.is_recording():
+                    span.add_event(name="handle_events")
+                self.handle_events()
+                # Handle failed triggers
+                if span.is_recording():
+                    span.add_event(name="handle_failed_triggers")
+                self.handle_failed_triggers()
+                if span.is_recording():
+                    span.add_event(name="perform_heartbeat")
+                perform_heartbeat(
+                    self.job, heartbeat_callback=self.heartbeat_callback, 
only_if_necessary=True
+                )
+                # Collect stats
+                if span.is_recording():
+                    span.add_event(name="emit_metrics")
+                self.emit_metrics()
             # Idle sleep
             time.sleep(1)
 
+    @span
     def load_triggers(self):
         """Query the database for the triggers we're supposed to be running 
and update the runner."""
         Trigger.assign_unassigned(self.job.id, self.capacity, 
self.health_check_threshold)
         ids = Trigger.ids_for_triggerer(self.job.id)
         self.trigger_runner.update_triggers(set(ids))
 
+    @span
     def handle_events(self):
         """Dispatch outbound events to the Trigger model which pushes them to 
the relevant task instances."""
         while self.trigger_runner.events:
@@ -392,6 +410,7 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
             # Emit stat event
             Stats.incr("triggers.succeeded")
 
+    @span
     def handle_failed_triggers(self):
         """
         Handle "failed" triggers. - ones that errored or exited before they 
sent an event.
@@ -405,11 +424,15 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
             # Emit stat event
             Stats.incr("triggers.failed")
 
+    @span
     def emit_metrics(self):
         Stats.gauge(f"triggers.running.{self.job.hostname}", 
len(self.trigger_runner.triggers))
         Stats.gauge(
             "triggers.running", len(self.trigger_runner.triggers), 
tags={"hostname": self.job.hostname}
         )
+        span = Trace.get_current_span()
+        span.set_attribute("trigger host", self.job.hostname)
+        span.set_attribute("triggers running", 
len(self.trigger_runner.triggers))
 
 
 class TriggerDetails(TypedDict):
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 6c3d0715b9..d4ef937e9d 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -62,7 +62,9 @@ from airflow.models.tasklog import LogTemplate
 from airflow.stats import Stats
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES
+from airflow.traces.tracer import Trace
 from airflow.utils import timezone
+from airflow.utils.dates import datetime_to_nano
 from airflow.utils.helpers import chunks, is_container, prune_dict
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -919,6 +921,37 @@ class DagRun(Base, LoggingMixin):
                 self.data_interval_end,
                 self.dag_hash,
             )
+
+            with Trace.start_span_from_dagrun(dagrun=self) as span:
+                if self._state is DagRunState.FAILED:
+                    span.set_attribute("error", True)
+                attributes = {
+                    "category": "DAG runs",
+                    "dag_id": str(self.dag_id),
+                    "execution_date": str(self.execution_date),
+                    "run_id": str(self.run_id),
+                    "queued_at": str(self.queued_at),
+                    "run_start_date": str(self.start_date),
+                    "run_end_date": str(self.end_date),
+                    "run_duration": str(
+                        (self.end_date - self.start_date).total_seconds()
+                        if self.start_date and self.end_date
+                        else 0
+                    ),
+                    "state": str(self._state),
+                    "external_trigger": str(self.external_trigger),
+                    "run_type": str(self.run_type),
+                    "data_interval_start": str(self.data_interval_start),
+                    "data_interval_end": str(self.data_interval_end),
+                    "dag_hash": str(self.dag_hash),
+                    "conf": str(self.conf),
+                }
+                if span.is_recording():
+                    span.add_event(name="queued", 
timestamp=datetime_to_nano(self.queued_at))
+                    span.add_event(name="started", 
timestamp=datetime_to_nano(self.start_date))
+                    span.add_event(name="ended", 
timestamp=datetime_to_nano(self.end_date))
+                span.set_attributes(attributes)
+
             session.flush()
 
         self._emit_true_scheduling_delay_stats_for_finished_state(finished_tis)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 1dacbe7525..27eb5c26c2 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -108,6 +108,7 @@ from airflow.stats import Stats
 from airflow.templates import SandboxedEnvironment
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS
+from airflow.traces.tracer import Trace
 from airflow.utils import timezone
 from airflow.utils.context import (
     ConnectionAccessor,
@@ -1211,6 +1212,27 @@ def _handle_failure(
     if not test_mode:
         TaskInstance.save_to_db(failure_context["ti"], session)
 
+    with Trace.start_span_from_taskinstance(ti=task_instance) as span:
+        # ---- error info ----
+        span.set_attribute("error", "true")
+        span.set_attribute("error_msg", str(error))
+        span.set_attribute("context", context)
+        span.set_attribute("force_fail", force_fail)
+        # ---- common info ----
+        span.set_attribute("category", "DAG runs")
+        span.set_attribute("task_id", task_instance.task_id)
+        span.set_attribute("dag_id", task_instance.dag_id)
+        span.set_attribute("state", task_instance.state)
+        span.set_attribute("start_date", str(task_instance.start_date))
+        span.set_attribute("end_date", str(task_instance.end_date))
+        span.set_attribute("duration", task_instance.duration)
+        span.set_attribute("executor_config", 
str(task_instance.executor_config))
+        span.set_attribute("execution_date", str(task_instance.execution_date))
+        span.set_attribute("hostname", task_instance.hostname)
+        if isinstance(task_instance, TaskInstance):
+            span.set_attribute("log_url", task_instance.log_url)
+        span.set_attribute("operator", str(task_instance.operator))
+
 
 def _refresh_from_task(
     *, task_instance: TaskInstance | TaskInstancePydantic, task: Operator, 
pool_override: str | None = None
diff --git a/airflow/traces/__init__.py b/airflow/traces/__init__.py
index abe55b5103..7b2f416872 100644
--- a/airflow/traces/__init__.py
+++ b/airflow/traces/__init__.py
@@ -18,3 +18,4 @@ from __future__ import annotations
 
 TRACEPARENT = "traceparent"
 TRACESTATE = "tracestate"
+NO_TRACE_ID = 1
diff --git a/airflow/traces/tracer.py b/airflow/traces/tracer.py
index 88999abe2b..1d58717287 100644
--- a/airflow/traces/tracer.py
+++ b/airflow/traces/tracer.py
@@ -96,6 +96,9 @@ class EmptySpan:
         """Set multiple attributes at once."""
         pass
 
+    def is_recording(self):
+        return False
+
     def add_event(
         self,
         name: str,
diff --git a/airflow/traces/utils.py b/airflow/traces/utils.py
index eaf3c1c065..afab2591d5 100644
--- a/airflow/traces/utils.py
+++ b/airflow/traces/utils.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 import logging
 from typing import TYPE_CHECKING
 
+from airflow.traces import NO_TRACE_ID
 from airflow.utils.hashlib_wrapper import md5
 from airflow.utils.state import TaskInstanceState
 
@@ -40,9 +41,12 @@ def _gen_id(seeds: list[str], as_int: bool = False, type: 
int = TRACE_ID) -> str
 
 
 def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+    if dag_run.start_date is None:
+        return NO_TRACE_ID
+
     """Generate trace id from DagRun."""
     return _gen_id(
-        [dag_run.dag_id, dag_run.run_id, str(dag_run.start_date.timestamp())],
+        [dag_run.dag_id, str(dag_run.run_id), 
str(dag_run.start_date.timestamp())],
         as_int,
     )
 
@@ -50,7 +54,7 @@ def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> 
str | int:
 def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) -> 
str | int:
     """Generate span id from TI key."""
     return _gen_id(
-        [ti_key.dag_id, ti_key.run_id, ti_key.task_id, str(ti_key.try_number)],
+        [ti_key.dag_id, str(ti_key.run_id), ti_key.task_id, 
str(ti_key.try_number)],
         as_int,
         SPAN_ID,
     )
@@ -58,8 +62,11 @@ def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: 
bool = False) -> st
 
 def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int:
     """Generate dag's root span id using dag_run."""
+    if dag_run.start_date is None:
+        return NO_TRACE_ID
+
     return _gen_id(
-        [dag_run.dag_id, dag_run.run_id, str(dag_run.start_date.timestamp())],
+        [dag_run.dag_id, str(dag_run.run_id), 
str(dag_run.start_date.timestamp())],
         as_int,
         SPAN_ID,
     )
diff --git a/scripts/ci/docker-compose/integration-otel.yml 
b/scripts/ci/docker-compose/integration-otel.yml
index 6573709bc3..7a635c17c7 100644
--- a/scripts/ci/docker-compose/integration-otel.yml
+++ b/scripts/ci/docker-compose/integration-otel.yml
@@ -54,7 +54,7 @@ services:
       - ./grafana/volume/provisioning:/grafana/provisioning
 
   jaeger:
-    image: jaegertracing/all-in-one
+    image: jaegertracing/all-in-one:1.57
     container_name: "breeze-jaeger"
     environment:
       COLLECTOR_OTLP_ENABLED: true

Reply via email to