This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch py-client-sync in repository https://gitbox.apache.org/repos/asf/airflow.git
commit cb263b9c9b810aa73e4bfdb28c82701ad339ac72 Author: Daniel Standish <[email protected]> AuthorDate: Mon Mar 23 18:56:13 2026 -0700 Introduce parent task spans and nest worker and trigger spans under them (#63839) This lets us tie together the worker and trigger phases of task execution. Also lets us see the delta between task queued time and task start time. --- .../execution_api/routes/task_instances.py | 46 ++++++++ .../src/airflow/executors/workloads/task.py | 1 - .../src/airflow/jobs/triggerer_job_runner.py | 88 +++++++++++----- airflow-core/src/airflow/models/dagrun.py | 24 ++++- airflow-core/src/airflow/models/taskinstance.py | 13 ++- airflow-core/src/airflow/models/taskmap.py | 16 +++ airflow-core/tests/integration/otel/test_otel.py | 5 +- .../versions/head/test_task_instances.py | 116 +++++++++++++++++++++ airflow-core/tests/unit/jobs/test_triggerer_job.py | 101 +++++++++++++++++- airflow-core/tests/unit/models/test_dagrun.py | 35 ++----- .../tests/unit/models/test_taskinstance.py | 104 ++++++++++++++++++ .../observability/traces/__init__.py | 12 +++ .../src/airflow/sdk/execution_time/task_runner.py | 4 +- .../task_sdk/execution_time/test_task_runner.py | 87 +++++++++++----- 14 files changed, 562 insertions(+), 90 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 96d8f3a6c86..5f5073c916b 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -29,6 +29,9 @@ import attrs import structlog from cadwyn import VersionedAPIRouter from fastapi import Body, HTTPException, Query, Security, status +from opentelemetry import trace +from opentelemetry.trace import StatusCode +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from pydantic import JsonValue from sqlalchemy import and_, func, or_, tuple_, update from sqlalchemy.engine import CursorResult @@ -37,6 +40,7 @@ from sqlalchemy.orm import joinedload from sqlalchemy.sql import select from structlog.contextvars import bind_contextvars +from airflow._shared.observability.traces import override_ids from airflow._shared.timezones import timezone from airflow.api_fastapi.common.dagbag import DagBagDep, get_latest_version_of_dag from airflow.api_fastapi.common.db.common import SessionDep @@ -87,6 +91,7 @@ ti_id_router = VersionedAPIRouter( log = structlog.get_logger(__name__) +tracer = trace.get_tracer(__name__) @ti_id_router.patch( @@ -431,6 +436,46 @@ def ti_update_state( ) +def _emit_task_span(ti, state): + # just to be safe + if not ti.dag_run: + return + if not isinstance(ti.dag_run.context_carrier, dict): + return + if not isinstance(ti.context_carrier, dict): + return + dr_ctx = TraceContextTextMapPropagator().extract(ti.dag_run.context_carrier) + + ti_ctx = TraceContextTextMapPropagator().extract(ti.context_carrier) + ti_span = trace.get_current_span(context=ti_ctx) + span_context = ti_span.get_span_context() + start_time_candidates = (x for x in (ti.queued_dttm, ti.start_date, timezone.utcnow()) if x) + name = f"task_run.{ti.task_id}" + if ti.map_index >= 0: + name += f"[{ti.map_index}]" + with override_ids(span_context.trace_id, span_context.span_id): + span = tracer.start_span( + name=name, + start_time=int(min(start_time_candidates).timestamp() * 1e9), + context=dr_ctx, + ) + + span.set_attributes( + { + "airflow.dag_id": ti.dag_id, + "airflow.task_id": ti.task_id, + "airflow.dag_run.run_id": ti.run_id, + "airflow.task_instance.try_number": ti.try_number, + "airflow.task_instance.map_index": ti.map_index if ti.map_index is not None else -1, + "airflow.task_instance.state": state, + "airflow.task_instance.id": ti.id, + } + ) + status_code = StatusCode.OK if state == TaskInstanceState.SUCCESS else StatusCode.ERROR + span.set_status(status_code) + span.end() + + def _handle_fail_fast_for_dag(ti: TI, dag_id: str, session: SessionDep, dag_bag: DagBagDep) -> None: dr = ti.dag_run @@ -479,6 +524,7 @@ def _create_ti_state_update_query_and_update_state( ti_patch_payload.outlet_events, session, ) + _emit_task_span(ti, state=updated_state) elif isinstance(ti_patch_payload, TIDeferredStatePayload): # Calculate timeout if it was passed timeout = None diff --git a/airflow-core/src/airflow/executors/workloads/task.py b/airflow-core/src/airflow/executors/workloads/task.py index a5939cf4244..4ca8c310fb5 100644 --- a/airflow-core/src/airflow/executors/workloads/task.py +++ b/airflow-core/src/airflow/executors/workloads/task.py @@ -86,7 +86,6 @@ class ExecuteTask(BaseDagBundleWorkload): from airflow.utils.helpers import log_filename_template_renderer ser_ti = TaskInstanceDTO.model_validate(ti, from_attributes=True) - ser_ti.context_carrier = ti.dag_run.context_carrier if not bundle_info: bundle_info = BundleInfo( name=ti.dag_model.bundle_name, diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 1406283c05c..44c28a7a539 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -35,6 +35,9 @@ from typing import TYPE_CHECKING, Annotated, Any, BinaryIO, ClassVar, Literal, T import anyio import attrs import structlog +from opentelemetry import trace +from opentelemetry.trace import Status, StatusCode +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from pydantic import BaseModel, Field, TypeAdapter from sqlalchemy import func, select from structlog.contextvars import bind_contextvars as bind_log_contextvars @@ -87,6 +90,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import provide_session if TYPE_CHECKING: + from opentelemetry.util._decorator import _AgnosticContextManager from sqlalchemy.orm import Session from structlog.typing import FilteringBoundLogger, WrappedLogger @@ -96,6 +100,34 @@ if TYPE_CHECKING: from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI logger = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) + + +def _make_trigger_span( + ti: TaskInstanceDTO | None, trigger_id: int, name: str +) -> _AgnosticContextManager[trace.Span]: + parent_context = ( + TraceContextTextMapPropagator().extract(ti.context_carrier) if ti and ti.context_carrier else None + ) + attributes: dict[str, str | int] = { + "airflow.trigger.name": name, + } + if isinstance(ti, TaskInstanceDTO): + span_name = f"trigger.{ti.task_id}" if ti else f"trigger.{trigger_id}" + if ti.map_index >= 0: + span_name += f"_{ti.map_index}" + attributes = { + **attributes, + "airflow.dag_id": ti.dag_id, + "airflow.task_id": ti.task_id, + "airflow.dag_run.run_id": ti.run_id, + "airflow.task_instance.try_number": ti.try_number, + "airflow.task_instance.map_index": ti.map_index, + } + else: + span_name = f"trigger.{name}" + return tracer.start_as_current_span(span_name, attributes=attributes, context=parent_context) + __all__ = [ "TriggerRunner", @@ -1179,30 +1211,38 @@ class TriggerRunner: name = self.triggers[trigger_id]["name"] self.log.info("trigger %s starting", name) - try: - async for event in trigger.run(): - await self.log.ainfo( - "Trigger fired event", name=self.triggers[trigger_id]["name"], result=event - ) - self.triggers[trigger_id]["events"] += 1 - self.events.append((trigger_id, event)) - except asyncio.CancelledError: - # We get cancelled by the scheduler changing the task state. But if we do lets give a nice error - # message about it - if timeout := timeout_after: - timeout = timeout.replace(tzinfo=timezone.utc) if not timeout.tzinfo else timeout - if timeout < timezone.utcnow(): - await self.log.aerror("Trigger cancelled due to timeout") - raise - finally: - # CancelledError will get injected when we're stopped - which is - # fine, the cleanup process will understand that, but we want to - # allow triggers a chance to cleanup, either in that case or if - # they exit cleanly. Exception from cleanup methods are ignored. - with suppress(Exception): - await trigger.cleanup() - - await self.log.ainfo("trigger completed", name=name) + with _make_trigger_span(ti=trigger.task_instance, trigger_id=trigger_id, name=name) as span: + try: + async for event in trigger.run(): + await self.log.ainfo( + "Trigger fired event", name=self.triggers[trigger_id]["name"], result=event + ) + self.triggers[trigger_id]["events"] += 1 + self.events.append((trigger_id, event)) + span.set_status(Status(StatusCode.OK)) + except asyncio.CancelledError as e: + # We get cancelled by the scheduler changing the task state. But if we do lets give a nice error + # message about it + if timeout := timeout_after: + timeout = timeout.replace(tzinfo=timezone.utc) if not timeout.tzinfo else timeout + if timeout < timezone.utcnow(): + await self.log.aerror("Trigger cancelled due to timeout") + span.set_status(Status(StatusCode.ERROR), description=str(e)) + raise + span.set_status(Status(StatusCode.OK), description=str(e)) + raise + except Exception as e: + span.set_status(Status(StatusCode.ERROR), description=str(e)) + raise + finally: + # CancelledError will get injected when we're stopped - which is + # fine, the cleanup process will understand that, but we want to + # allow triggers a chance to cleanup, either in that case or if + # they exit cleanly. Exception from cleanup methods are ignored. + with suppress(Exception): + await trigger.cleanup() + + await self.log.ainfo("trigger completed", name=name) def get_trigger_by_classpath(self, classpath: str) -> type[BaseTrigger]: """ diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 73a5a3a875a..20ec118f33e 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -1027,21 +1027,33 @@ class DagRun(Base, LoggingMixin): return leaf_tis def _emit_dagrun_span(self, state: DagRunState): - ctx = TraceContextTextMapPropagator().extract(self.context_carrier or {}) + # just to be safe + if not isinstance(self.context_carrier, dict): + return + + ctx = TraceContextTextMapPropagator().extract(self.context_carrier) span = trace.get_current_span(context=ctx) span_context = span.get_span_context() with override_ids(span_context.trace_id, span_context.span_id): - attributes = { + attributes: dict[str, str] = { "airflow.dag_id": str(self.dag_id), "airflow.dag_run.run_id": self.run_id, } + if self.start_date: + attributes["airflow.dag_run.start_date"] = str(self.start_date) + if self.end_date: + attributes["airflow.dag_run.end_date"] = str(self.end_date) + if self.queued_at: + attributes["airflow.dag_run.queued_at"] = str(self.queued_at) + if self.created_at: + attributes["airflow.dag_run.created_at"] = str(self.created_at) if self.logical_date: attributes["airflow.dag_run.logical_date"] = str(self.logical_date) if self.partition_key: attributes["airflow.dag_run.partition_key"] = str(self.partition_key) span = tracer.start_span( name=f"dag_run.{self.dag_id}", - start_time=int((self.start_date or timezone.utcnow()).timestamp() * 1e9), + start_time=int((self.queued_at or self.start_date or timezone.utcnow()).timestamp() * 1e9), attributes=attributes, context=context.Context(), ) @@ -1771,7 +1783,11 @@ class DagRun(Base, LoggingMixin): created_counts[task.task_type] += 1 for map_index in indexes: yield TI.insert_mapping( - self.run_id, task, map_index=map_index, dag_version_id=dag_version_id + self.run_id, + task, + map_index=map_index, + dag_version_id=dag_version_id, + dag_run=self, ) creator = create_ti_mapping diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 443468161f8..e212ca68504 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -32,6 +32,7 @@ from uuid import UUID import attrs import dill import uuid6 +from opentelemetry import trace from sqlalchemy import ( JSON, Float, @@ -67,6 +68,7 @@ from sqlalchemy.orm.attributes import NO_VALUE, set_committed_value from airflow import settings from airflow._shared.observability.metrics.dual_stats_manager import DualStatsManager from airflow._shared.observability.metrics.stats import Stats +from airflow._shared.observability.traces import new_dagrun_trace_carrier, new_task_run_carrier from airflow._shared.timezones import timezone from airflow.assets.manager import asset_manager from airflow.configuration import conf @@ -102,7 +104,7 @@ from airflow.utils.state import DagRunState, State, TaskInstanceState TR = TaskReschedule log = logging.getLogger(__name__) - +tracer = trace.get_tracer(__name__) if TYPE_CHECKING: from datetime import datetime @@ -382,7 +384,7 @@ def clear_task_instances( for instance in tis: run_ids_by_dag_id[instance.dag_id].add(instance.run_id) - drs = session.scalars( + drs: Iterable[DagRun] = session.scalars( select(DagRun).where( or_( *( @@ -397,6 +399,7 @@ def clear_task_instances( # Always update clear_number and queued_at when clearing tasks, regardless of state dr.clear_number += 1 dr.queued_at = timezone.utcnow() + dr.context_carrier = new_dagrun_trace_carrier() _recalculate_dagrun_queued_at_deadlines(dr, dr.queued_at, session) @@ -425,6 +428,8 @@ def clear_task_instances( if dag_run_state == DagRunState.QUEUED: dr.last_scheduling_decision = None dr.start_date = None + for ti in tis: + ti.context_carrier = new_task_run_carrier(ti.dag_run.context_carrier) session.flush() @@ -679,7 +684,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload): @staticmethod def insert_mapping( - run_id: str, task: Operator, map_index: int, dag_version_id: UUID | None + run_id: str, task: Operator, map_index: int, *, dag_version_id: UUID | None, dag_run: DagRun ) -> dict[str, Any]: """ Insert mapping. @@ -689,6 +694,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload): priority_weight = task.weight_rule.get_weight( TaskInstance(task=task, run_id=run_id, map_index=map_index, dag_version_id=dag_version_id) ) + context_carrier = new_task_run_carrier(dag_run.context_carrier) return { "dag_id": task.dag_id, @@ -710,6 +716,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload): "map_index": map_index, "_task_display_property_value": task.task_display_name, "dag_version_id": dag_version_id, + "context_carrier": context_carrier, } @reconstructor diff --git a/airflow-core/src/airflow/models/taskmap.py b/airflow-core/src/airflow/models/taskmap.py index 18d09d6aa56..60486b8ce86 100644 --- a/airflow-core/src/airflow/models/taskmap.py +++ b/airflow-core/src/airflow/models/taskmap.py @@ -24,9 +24,11 @@ import enum from collections.abc import Collection, Iterable, Sequence from typing import TYPE_CHECKING, Any +from opentelemetry import trace from sqlalchemy import CheckConstraint, ForeignKeyConstraint, Integer, String, func, or_, select from sqlalchemy.orm import Mapped, mapped_column +from airflow._shared.observability.traces import new_task_run_carrier from airflow.models.base import COLLATION_ARGS, ID_LEN, TaskInstanceDependencies from airflow.models.dag_version import DagVersion from airflow.utils.db import exists_query @@ -38,6 +40,7 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance from airflow.serialization.definitions.mappedoperator import Operator +tracer = trace.get_tracer(__name__) class TaskMapVariant(enum.Enum): @@ -242,6 +245,18 @@ class TaskMap(TaskInstanceDependencies): else: dag_version_id = None + if unmapped_ti: + dr = unmapped_ti.dag_run + else: + from airflow.models import DagRun + + dr = session.scalar( + select(DagRun).where( + DagRun.dag_id == task.dag_id, + DagRun.run_id == run_id, + ) + ) + for index in indexes_to_map: # TODO: Make more efficient with bulk_insert_mappings/bulk_save_mappings. ti = TaskInstance( @@ -254,6 +269,7 @@ class TaskMap(TaskInstanceDependencies): task.log.debug("Expanding TIs upserted %s", ti) task_instance_mutation_hook(ti) ti = session.merge(ti) + ti.context_carrier = new_task_run_carrier(dr.context_carrier) ti.refresh_from_task(task) # session.merge() loses task information. all_expanded_tis.append(ti) diff --git a/airflow-core/tests/integration/otel/test_otel.py b/airflow-core/tests/integration/otel/test_otel.py index 0d40156c45e..6852b47af04 100644 --- a/airflow-core/tests/integration/otel/test_otel.py +++ b/airflow-core/tests/integration/otel/test_otel.py @@ -508,9 +508,10 @@ class TestOtelIntegration: nested = get_span_hierarchy() assert nested == { - "sub_span1": "task_run.task1", - "task_run.task1": "dag_run.otel_test_dag", "dag_run.otel_test_dag": None, + "sub_span1": "worker.task1", + "task_run.task1": "dag_run.otel_test_dag", + "worker.task1": "task_run.task1", } def start_scheduler(self, capture_output: bool = False): diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index 7cc7aa85594..7f766ede71e 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -24,13 +24,21 @@ from uuid import UUID, uuid4 import pytest import uuid6 +from opentelemetry import trace as otel_trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.trace import StatusCode +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from sqlalchemy import select, update from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Session +from airflow._shared.observability.traces import OverrideableRandomIdGenerator from airflow._shared.timezones import timezone from airflow.api_fastapi.auth.tokens import JWTValidator from airflow.api_fastapi.execution_api.app import lifespan +from airflow.api_fastapi.execution_api.routes.task_instances import _emit_task_span from airflow.exceptions import AirflowSkipException from airflow.models import RenderedTaskInstanceFields, TaskReschedule, Trigger from airflow.models.asset import AssetActive, AssetAliasModel, AssetEvent, AssetModel @@ -3242,3 +3250,111 @@ class TestTokenTypeValidation: payload = {"state": "success", "end_date": "2024-10-31T13:00:00Z"} resp = client.patch(f"/execution/task-instances/{ti.id}/state", json=payload) assert resp.status_code in [200, 204] + + +class TestEmitTaskSpan: + """Tests for the _emit_task_span function in the execution API task-instance route.""" + + @pytest.fixture(autouse=True) + def sdk_tracer_provider(self): + self.exporter = InMemorySpanExporter() + provider = TracerProvider(id_generator=OverrideableRandomIdGenerator()) + provider.add_span_processor(SimpleSpanProcessor(self.exporter)) + test_tracer = provider.get_tracer("test") + with mock.patch("airflow.api_fastapi.execution_api.routes.task_instances.tracer", test_tracer): + yield + + def _make_carriers(self): + """Return a (dr_carrier, ti_carrier) pair built with a real SDK provider.""" + p = TracerProvider() + t = p.get_tracer("setup") + dr_span = t.start_span("dr") + dr_ctx = otel_trace.set_span_in_context(dr_span) + dr_carrier: dict = {} + TraceContextTextMapPropagator().inject(dr_carrier, context=dr_ctx) + ti_span = t.start_span("ti", context=dr_ctx) + ti_ctx = otel_trace.set_span_in_context(ti_span) + ti_carrier: dict = {} + TraceContextTextMapPropagator().inject(ti_carrier, context=ti_ctx) + return dr_carrier, ti_carrier + + def _make_ti(self, task_id="my_task", map_index=-1, queued_dttm=None, start_date=None): + dr_carrier, ti_carrier = self._make_carriers() + ti = mock.MagicMock() + ti.dag_id = "test_dag" + ti.task_id = task_id + ti.run_id = "test_run" + ti.try_number = 1 + ti.map_index = map_index + ti.queued_dttm = queued_dttm + ti.start_date = start_date or DEFAULT_START_DATE + ti.dag_run.context_carrier = dr_carrier + ti.context_carrier = ti_carrier + return ti + + def test_emit_task_span_success_sets_ok_status(self): + _emit_task_span(self._make_ti(), TaskInstanceState.SUCCESS) + + spans = self.exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.OK + + def test_emit_task_span_failed_sets_error_status(self): + _emit_task_span(self._make_ti(), TaskInstanceState.FAILED) + + spans = self.exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.ERROR + + def test_emit_task_span_sets_attributes(self): + ti = self._make_ti(task_id="my_task", map_index=2) + _emit_task_span(ti, TaskInstanceState.SUCCESS) + + attrs = self.exporter.get_finished_spans()[0].attributes + assert attrs["airflow.dag_id"] == "test_dag" + assert attrs["airflow.task_id"] == "my_task" + assert attrs["airflow.dag_run.run_id"] == "test_run" + assert attrs["airflow.task_instance.try_number"] == 1 + assert attrs["airflow.task_instance.map_index"] == 2 + assert attrs["airflow.task_instance.state"] == TaskInstanceState.SUCCESS + + def test_emit_task_span_name_unmapped(self): + _emit_task_span(self._make_ti(task_id="my_task", map_index=-1), TaskInstanceState.SUCCESS) + assert self.exporter.get_finished_spans()[0].name == "task_run.my_task" + + def test_emit_task_span_name_mapped(self): + _emit_task_span(self._make_ti(task_id="my_task", map_index=3), TaskInstanceState.SUCCESS) + assert self.exporter.get_finished_spans()[0].name == "task_run.my_task[3]" + + def test_emit_task_span_start_time_uses_queued_dttm(self): + queued_dttm = timezone.parse("2024-01-01T10:00:00Z") + start_date = timezone.parse("2024-01-01T10:05:00Z") + ti = self._make_ti(queued_dttm=queued_dttm, start_date=start_date) + _emit_task_span(ti, TaskInstanceState.SUCCESS) + + assert self.exporter.get_finished_spans()[0].start_time == int(queued_dttm.timestamp() * 1e9) + + def test_emit_task_span_start_time_falls_back_to_start_date(self): + start_date = timezone.parse("2024-01-01T10:05:00Z") + ti = self._make_ti(queued_dttm=None, start_date=start_date) + _emit_task_span(ti, TaskInstanceState.SUCCESS) + + assert self.exporter.get_finished_spans()[0].start_time == int(start_date.timestamp() * 1e9) + + def test_emit_task_span_skips_if_no_ti_carrier(self): + ti = mock.MagicMock() + ti.dag_run.context_carrier = { + "traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01" + } + ti.context_carrier = None + + _emit_task_span(ti, TaskInstanceState.SUCCESS) + assert len(self.exporter.get_finished_spans()) == 0 + + def test_emit_task_span_skips_if_no_dagrun_carrier(self): + ti = mock.MagicMock() + ti.dag_run.context_carrier = None + ti.context_carrier = {"traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"} + + _emit_task_span(ti, TaskInstanceState.SUCCESS) + assert len(self.exporter.get_finished_spans()) == 0 diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 802a34192e3..3761189bfeb 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -24,18 +24,26 @@ import os import selectors import time import typing +import uuid from collections.abc import AsyncIterator from socket import socket from typing import TYPE_CHECKING, Any +from unittest import mock from unittest.mock import ANY, AsyncMock, MagicMock, patch import pendulum import pytest from asgiref.sync import sync_to_async +from opentelemetry import trace as otel_trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from structlog.typing import FilteringBoundLogger from airflow._shared.timezones import timezone from airflow.executors import workloads +from airflow.executors.workloads.task import TaskInstanceDTO from airflow.jobs.job import Job from airflow.jobs.triggerer_job_runner import ( ToTriggerRunner, @@ -45,6 +53,7 @@ from airflow.jobs.triggerer_job_runner import ( TriggerLoggingFactory, TriggerRunner, TriggerRunnerSupervisor, + _make_trigger_span, messages, ) from airflow.models import Connection, DagModel, DagRun, Trigger, Variable @@ -318,7 +327,6 @@ def test_trigger_logger_close(): def test_trigger_logger_fd_closed_when_removed(session): - trigger = TimeDeltaTrigger(datetime.timedelta(seconds=0.5)) create_trigger_in_db(session, trigger) @@ -349,11 +357,12 @@ class TestTriggerRunner: mock_trigger = MagicMock(spec=BaseTrigger) mock_trigger.timeout_after = None mock_trigger.run.side_effect = asyncio.CancelledError() + mock_trigger.task_instance = MagicMock() + mock_trigger.task_instance.map_index = -1 with pytest.raises(asyncio.CancelledError): asyncio.run(trigger_runner.run_trigger(1, mock_trigger)) - # @pytest.mark.asyncio def test_run_inline_trigger_timeout(self, session, cap_structlog) -> None: trigger_runner = TriggerRunner() trigger_runner.triggers = { @@ -361,6 +370,8 @@ class TestTriggerRunner: } mock_trigger = MagicMock(spec=BaseTrigger) mock_trigger.run.side_effect = asyncio.CancelledError() + mock_trigger.task_instance = MagicMock() + mock_trigger.task_instance.map_index = -1 with pytest.raises(asyncio.CancelledError): asyncio.run( @@ -1358,3 +1369,89 @@ class TestTriggererMessageTypes: + "\n".join(f" - {t}" for t in sorted(task_diff)) + "\n\nEither handle these types in ToTriggerRunner or update in_task_but_not_in_trigger_runner list." ) + + +class TestMakeTriggerSpan: + """Tests for the _make_trigger_span helper in the triggerer job runner.""" + + @pytest.fixture(autouse=True) + def sdk_tracer_provider(self): + self.exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(self.exporter)) + test_tracer = provider.get_tracer("test") + with mock.patch("airflow.jobs.triggerer_job_runner.tracer", test_tracer): + yield + + def _make_ti_dto(self, task_id="my_task", map_index=-1, context_carrier=None): + return TaskInstanceDTO( + id=uuid.uuid4(), + dag_version_id=uuid.uuid4(), + task_id=task_id, + dag_id="test_dag", + run_id="test_run", + try_number=1, + map_index=map_index, + pool_slots=1, + queue="default", + priority_weight=1, + context_carrier=context_carrier, + ) + + def test_make_trigger_span_name_with_task_instance(self): + ti = self._make_ti_dto(task_id="sensor_task", map_index=-1) + with _make_trigger_span(ti=ti, trigger_id=1, name="MySensor"): + pass + assert self.exporter.get_finished_spans()[0].name == "trigger.sensor_task" + + def test_make_trigger_span_name_with_mapped_task(self): + ti = self._make_ti_dto(task_id="sensor_task", map_index=2) + with _make_trigger_span(ti=ti, trigger_id=1, name="MySensor"): + pass + assert self.exporter.get_finished_spans()[0].name == "trigger.sensor_task_2" + + def test_make_trigger_span_name_without_task_instance(self): + with _make_trigger_span(ti=None, trigger_id=42, name="Some trigger name"): + pass + assert self.exporter.get_finished_spans()[0].name == "trigger.Some trigger name" + + def test_make_trigger_span_uses_task_context_carrier(self): + # Build a valid ti carrier from a separate provider so we have a known parent span. + setup_provider = TracerProvider() + setup_tracer = setup_provider.get_tracer("setup") + parent_span = setup_tracer.start_span("ti_parent") + parent_ctx = otel_trace.set_span_in_context(parent_span) + ti_carrier: dict = {} + TraceContextTextMapPropagator().inject(ti_carrier, context=parent_ctx) + expected_parent_span_id = parent_span.get_span_context().span_id + + ti = self._make_ti_dto(context_carrier=ti_carrier) + with _make_trigger_span(ti=ti, trigger_id=1, name="MySensor"): + pass + + spans = self.exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].parent is not None + assert spans[0].parent.span_id == expected_parent_span_id + + def test_make_trigger_span_sets_attributes_with_ti(self): + ti = self._make_ti_dto(task_id="my_task", map_index=1) + with _make_trigger_span(ti=ti, trigger_id=5, name="MyTrigger"): + pass + + attrs = self.exporter.get_finished_spans()[0].attributes + assert attrs["airflow.trigger.name"] == "MyTrigger" + assert attrs["airflow.dag_id"] == "test_dag" + assert attrs["airflow.task_id"] == "my_task" + assert attrs["airflow.dag_run.run_id"] == "test_run" + assert attrs["airflow.task_instance.try_number"] == 1 + assert attrs["airflow.task_instance.map_index"] == 1 + + def test_make_trigger_span_sets_only_trigger_name_without_ti(self): + with _make_trigger_span(ti=None, trigger_id=99, name="OnlyTrigger"): + pass + + attrs = self.exporter.get_finished_spans()[0].attributes + assert attrs["airflow.trigger.name"] == "OnlyTrigger" + assert "airflow.dag_id" not in attrs + assert "airflow.task_id" not in attrs diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index 7ad78292a1e..b446db9f2b9 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -27,7 +27,12 @@ from unittest.mock import ANY, call import pendulum import pytest +from opentelemetry import trace as otel_trace from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.trace import StatusCode +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from sqlalchemy import ( func, select, @@ -37,6 +42,7 @@ from sqlalchemy.orm import joinedload from airflow import settings from airflow._shared.observability.metrics.stats import Stats +from airflow._shared.observability.traces import OverrideableRandomIdGenerator from airflow._shared.timezones import timezone from airflow.callbacks.callback_requests import DagCallbackRequest, DagRunContext from airflow.models.dag import DagModel, infer_automated_data_interval @@ -3441,13 +3447,6 @@ class TestDagRunTracing: def test_emit_dagrun_span_uses_context_carrier_ids(self, dag_maker, session): """The emitted span should inherit trace_id/span_id from the context_carrier.""" - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.trace.export import SimpleSpanProcessor - from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter - from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator - - from airflow._shared.observability.traces import OverrideableRandomIdGenerator - in_mem_exporter = InMemorySpanExporter() provider = TracerProvider(id_generator=OverrideableRandomIdGenerator()) provider.add_span_processor(SimpleSpanProcessor(in_mem_exporter)) @@ -3471,8 +3470,6 @@ class TestDagRunTracing: # Decode the expected trace_id/span_id from the stored context_carrier ctx = TraceContextTextMapPropagator().extract(dr.context_carrier) - from opentelemetry import trace as otel_trace - stored_span = otel_trace.get_current_span(context=ctx) stored_ctx = stored_span.get_span_context() @@ -3482,13 +3479,6 @@ class TestDagRunTracing: @pytest.mark.parametrize("final_state", [DagRunState.SUCCESS, DagRunState.FAILED]) def test_emit_dagrun_span_attributes_and_status(self, dag_maker, session, final_state): """The emitted span should have the correct name, attributes, and status code.""" - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.trace.export import SimpleSpanProcessor - from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter - from opentelemetry.trace import StatusCode - - from airflow._shared.observability.traces import OverrideableRandomIdGenerator - in_mem_exporter = InMemorySpanExporter() provider = TracerProvider(id_generator=OverrideableRandomIdGenerator()) provider.add_span_processor(SimpleSpanProcessor(in_mem_exporter)) @@ -3527,12 +3517,6 @@ class TestDagRunTracing: context_carrier was cleared/backfilled to NULL. Per OTel spec, missing context results in a new root span rather than a crash. """ - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.trace.export import SimpleSpanProcessor - from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter - - from airflow._shared.observability.traces import OverrideableRandomIdGenerator - in_mem_exporter = InMemorySpanExporter() provider = TracerProvider(id_generator=OverrideableRandomIdGenerator()) provider.add_span_processor(SimpleSpanProcessor(in_mem_exporter)) @@ -3555,5 +3539,8 @@ class TestDagRunTracing: # A root span should still be emitted spans = in_mem_exporter.get_finished_spans() - assert len(spans) == 1 - assert spans[0].name == f"dag_run.{dr.dag_id}" + if isinstance(carrier_value, dict): + assert len(spans) == 1 + assert spans[0].name == f"dag_run.{dr.dag_id}" + else: + assert len(spans) == 0 diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 82b9adc3162..bb058d1a737 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -30,11 +30,15 @@ import pendulum import pytest import time_machine import uuid6 +from opentelemetry import trace as otel_trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from sqlalchemy import delete, func, select from sqlalchemy.exc import IntegrityError from airflow import settings from airflow._shared.observability.metrics.stats import Stats +from airflow._shared.observability.traces import new_dagrun_trace_carrier, new_task_run_carrier from airflow._shared.timezones import timezone from airflow.exceptions import ( AirflowException, @@ -3298,3 +3302,103 @@ def test_get_dagrun_loaded_but_none_returns_dagrun(dag_maker, session): assert dr_from_ti is not None assert dr_from_ti == dr + + +class TestMakeTaskCarrier: + """Tests for the _make_task_carrier helper.""" + + @pytest.fixture(autouse=True) + def sdk_tracer_provider(self): + provider = TracerProvider() + real_tracer = provider.get_tracer("airflow.models.taskinstance") + with ( + mock.patch("airflow.models.taskinstance.tracer", real_tracer), + mock.patch("airflow._shared.observability.traces.tracer", real_tracer), + ): + yield + + def test_make_task_carrier_returns_traceparent(self): + carrier = new_task_run_carrier(new_dagrun_trace_carrier()) + assert isinstance(carrier, dict) + assert "traceparent" in carrier + + def test_make_task_carrier_child_of_parent(self): + parent_carrier = new_dagrun_trace_carrier() + child_carrier = new_task_run_carrier(parent_carrier) + + propagator = TraceContextTextMapPropagator() + parent_trace_id = ( + otel_trace.get_current_span(context=propagator.extract(parent_carrier)) + .get_span_context() + .trace_id + ) + child_trace_id = ( + otel_trace.get_current_span(context=propagator.extract(child_carrier)).get_span_context().trace_id + ) + assert child_trace_id == parent_trace_id + assert child_trace_id != 0 + + def test_make_task_carrier_with_none_carrier(self): + carrier = new_task_run_carrier(None) + assert isinstance(carrier, dict) + assert "traceparent" in carrier + + [email protected]_test +def test_insert_mapping_includes_context_carrier(dag_maker, session): + """insert_mapping should include a context_carrier with a traceparent derived from the dag run.""" + provider = TracerProvider() + real_tracer = provider.get_tracer("airflow.models.taskinstance") + with ( + mock.patch("airflow.models.taskinstance.tracer", real_tracer), + mock.patch("airflow._shared.observability.traces.tracer", real_tracer), + ): + with dag_maker("test_insert_mapping_carrier"): + EmptyOperator(task_id="t1") + session.flush() + + # Get the scheduler-side operator (has a proper PriorityWeightStrategy, not the enum weight_rule). + op = create_scheduler_operator(dag_maker.dag.get_task("t1")) + + # Mock the DagRun to avoid inserting into the dag_run table (schema migrations may be pending). + dag_run = mock.MagicMock() + dag_run.context_carrier = new_dagrun_trace_carrier() + + mapping = TaskInstance.insert_mapping( + run_id="test_run", + task=op, + map_index=0, + dag_version_id=None, + dag_run=dag_run, + ) + + assert "context_carrier" in mapping + assert mapping["context_carrier"] is not None + assert "traceparent" in mapping["context_carrier"] + + [email protected]_test +def test_clear_task_instances_resets_context_carrier(dag_maker, session): + """clear_task_instances should assign fresh context carriers to both the TI and its dag run.""" + provider = TracerProvider() + real_tracer = provider.get_tracer("airflow.models.taskinstance") + with ( + mock.patch("airflow.models.taskinstance.tracer", real_tracer), + mock.patch("airflow._shared.observability.traces.tracer", real_tracer), + ): + with dag_maker("test_clear_carrier"): + EmptyOperator(task_id="t1") + dag_run = dag_maker.create_dagrun() + ti = dag_run.get_task_instance("t1", session=session) + ti.state = TaskInstanceState.SUCCESS + # Set an explicit carrier so we can verify it changes. + ti.context_carrier = {"traceparent": "00-aaaaaaaaaaaaaaaaaaaaaaaaaaaa0001-bbbbbbbbbbbbbbbb-01"} + session.flush() + + original_ti_traceparent = ti.context_carrier["traceparent"] + original_dr_traceparent = dag_run.context_carrier["traceparent"] + + clear_task_instances([ti], session) + + assert ti.context_carrier["traceparent"] != original_ti_traceparent + assert dag_run.context_carrier["traceparent"] != original_dr_traceparent diff --git a/shared/observability/src/airflow_shared/observability/traces/__init__.py b/shared/observability/src/airflow_shared/observability/traces/__init__.py index 53163e45b97..dc3532262d1 100644 --- a/shared/observability/src/airflow_shared/observability/traces/__init__.py +++ b/shared/observability/src/airflow_shared/observability/traces/__init__.py @@ -34,6 +34,7 @@ from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapProp if TYPE_CHECKING: from configparser import ConfigParser log = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) OVERRIDE_SPAN_ID_KEY = context.create_key("override_span_id") OVERRIDE_TRACE_ID_KEY = context.create_key("override_trace_id") @@ -70,6 +71,17 @@ def new_dagrun_trace_carrier() -> dict[str, str]: return carrier +def new_task_run_carrier(dag_run_context_carrier): + parent_context = ( + TraceContextTextMapPropagator().extract(dag_run_context_carrier) if dag_run_context_carrier else None + ) + span = tracer.start_span("notused", context=parent_context) # intentionally never closed + new_ctx = trace.set_span_in_context(span) + carrier: dict[str, str] = {} + TraceContextTextMapPropagator().inject(carrier, context=new_ctx) + return carrier + + @contextmanager def override_ids(trace_id, span_id, ctx=None): ctx = context.set_value(OVERRIDE_TRACE_ID_KEY, trace_id, context=ctx) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index aa5a5a08ad4..6e0b0766be3 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -148,9 +148,9 @@ def _make_task_span(msg: StartupDetails): TraceContextTextMapPropagator().extract(msg.ti.context_carrier) if msg.ti.context_carrier else None ) ti = msg.ti - span_name = f"task_run.{ti.task_id}" + span_name = f"worker.{ti.task_id}" if ti.map_index is not None and ti.map_index >= 0: - span_name += f"_{ti.map_index}" + span_name += f"[{ti.map_index}]" with tracer.start_as_current_span(span_name, context=parent_context) as span: span.set_attributes( { diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 0eab6a50afc..88424bfe53e 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -32,9 +32,16 @@ from unittest.mock import call, patch import pandas as pd import pytest +from opentelemetry import trace as otel_trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from task_sdk import FAKE_BUNDLE from uuid6 import uuid7 +from airflow._shared.observability.traces import OverrideableRandomIdGenerator, new_task_run_carrier +from airflow.api_fastapi.execution_api.routes.task_instances import _emit_task_span from airflow.listeners import hookimpl from airflow.providers.standard.operators.python import PythonOperator from airflow.sdk import ( @@ -415,24 +422,30 @@ def test_main_sends_reschedule_task_when_startup_reschedules( def test_task_span_is_child_of_dag_run_span(make_ti_context): - """Task span must be a child of the dag run span propagated via context_carrier.""" - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.trace.export import SimpleSpanProcessor - from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter - from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator - - # Build a real SDK provider and exporter so we can inspect finished spans. + """Full trace hierarchy: dag_run → task_run.my_task (API server) → worker.my_task (task runner).""" + # Single provider shared by all spans so contexts are compatible. in_mem_exporter = InMemorySpanExporter() - provider = TracerProvider() + provider = TracerProvider(id_generator=OverrideableRandomIdGenerator()) provider.add_span_processor(SimpleSpanProcessor(in_mem_exporter)) - # Create a "dag run" span whose context we will propagate into the task. + # Step 1: create the dag run span and capture its carrier. dag_run_tracer = provider.get_tracer("dag_run") with dag_run_tracer.start_as_current_span("dag_run.test_dag") as dag_run_span: - carrier: dict[str, str] = {} - TraceContextTextMapPropagator().inject(carrier) + dag_run_carrier: dict[str, str] = {} + TraceContextTextMapPropagator().inject(dag_run_carrier) dag_run_span_ctx = dag_run_span.get_span_context() + # Step 2: derive the parent task span carrier (child of dag run), as the scheduler does. + ti_model_tracer = provider.get_tracer("airflow.models.taskinstance") + with mock.patch("airflow.models.taskinstance.tracer", ti_model_tracer): + ti_carrier = new_task_run_carrier(dag_run_carrier) + + # Extract the parent task span context (the stable span ID stored in ti_carrier). + parent_task_span_ctx = otel_trace.get_current_span( + context=TraceContextTextMapPropagator().extract(ti_carrier) + ).get_span_context() + + # Step 3: build StartupDetails with ti.context_carrier = ti_carrier. what = StartupDetails( ti=TaskInstance( id=uuid7(), @@ -441,7 +454,7 @@ def test_task_span_is_child_of_dag_run_span(make_ti_context): run_id="test_run", try_number=1, dag_version_id=uuid7(), - context_carrier=carrier, + context_carrier=ti_carrier, ), dag_rel_path="", bundle_info=BundleInfo(name="my-bundle", version=None), @@ -450,27 +463,45 @@ def test_task_span_is_child_of_dag_run_span(make_ti_context): sentry_integration="", ) - task_tracer = provider.get_tracer("airflow.sdk.execution_time.task_runner") - with mock.patch("airflow.sdk.execution_time.task_runner.tracer", task_tracer): - with _make_task_span(what) as span: - task_span_ctx = span.get_span_context() + # Step 4: emit the worker span (task runner side). + task_runner_tracer = provider.get_tracer("airflow.sdk.execution_time.task_runner") + with mock.patch("airflow.sdk.execution_time.task_runner.tracer", task_runner_tracer): + with _make_task_span(what): + pass - # The task span must share the dag run's trace ID. - assert task_span_ctx.trace_id == dag_run_span_ctx.trace_id + # Step 5: emit the parent task span (API server side, as happens on task completion). + mock_ti = mock.MagicMock() + mock_ti.dag_id = "test_dag" + mock_ti.task_id = "my_task" + mock_ti.run_id = "test_run" + mock_ti.try_number = 1 + mock_ti.map_index = -1 + mock_ti.queued_dttm = None + mock_ti.start_date = timezone.utcnow() + mock_ti.dag_run.context_carrier = dag_run_carrier + mock_ti.context_carrier = ti_carrier + api_tracer = provider.get_tracer("airflow.api_fastapi.execution_api.routes.task_instances") + with mock.patch("airflow.api_fastapi.execution_api.routes.task_instances.tracer", api_tracer): + _emit_task_span(mock_ti, TaskInstanceState.SUCCESS) - # The task span's parent must be the dag run span. finished = in_mem_exporter.get_finished_spans() + + # task_run.my_task: emitted by API server, child of dag run, span_id == parent_task_span_ctx.span_id. task_spans = [s for s in finished if s.name == "task_run.my_task"] assert len(task_spans) == 1 - assert task_spans[0].parent is not None - assert task_spans[0].parent.span_id == dag_run_span_ctx.span_id - - # Span attributes are set correctly. - attrs = task_spans[0].attributes - assert attrs["airflow.dag_id"] == "test_dag" - assert attrs["airflow.task_id"] == "my_task" - assert attrs["airflow.dag_run.run_id"] == "test_run" - assert attrs["airflow.task_instance.try_number"] == 1 + task_span = task_spans[0] + assert task_span.parent is not None + assert task_span.parent.span_id == dag_run_span_ctx.span_id + assert task_span.context.span_id == parent_task_span_ctx.span_id + + # worker.my_task: created by task runner, child of the parent task span. + worker_spans = [s for s in finished if s.name == "worker.my_task"] + assert len(worker_spans) == 1 + assert worker_spans[0].parent is not None + assert worker_spans[0].parent.span_id == parent_task_span_ctx.span_id + + # All spans share the same trace ID. + assert {s.context.trace_id for s in finished} == {dag_run_span_ctx.trace_id} def test_task_span_no_parent_when_no_context_carrier(make_ti_context):
