This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cd633b7c495 add a shutdown for tests tracer (#63287)
cd633b7c495 is described below
commit cd633b7c495c56f79c492bddb8c05166b5678a69
Author: Christos Bisias <[email protected]>
AuthorDate: Wed Mar 11 00:44:04 2026 +0200
add a shutdown for tests tracer (#63287)
---
.../unit/observability/traces/test_otel_tracer.py | 120 +++++++++++----------
.../observability/traces/otel_tracer.py | 3 +
2 files changed, 68 insertions(+), 55 deletions(-)
diff --git a/airflow-core/tests/unit/observability/traces/test_otel_tracer.py
b/airflow-core/tests/unit/observability/traces/test_otel_tracer.py
index 3ca8e07f5d5..228aee2284b 100644
--- a/airflow-core/tests/unit/observability/traces/test_otel_tracer.py
+++ b/airflow-core/tests/unit/observability/traces/test_otel_tracer.py
@@ -81,22 +81,25 @@ class TestOtelTrace:
exporter.return_value = in_mem_exporter
tracer = otel_tracer.get_otel_tracer(Trace)
- assert otel_env_conf.called
- otel_env_conf.assert_called_once()
- with tracer.start_span(span_name="span1") as s1:
- with tracer.start_span(span_name="span2") as s2:
- s2.set_attribute("attr2", "val2")
- span2 = json.loads(s2.to_json())
- span1 = json.loads(s1.to_json())
- # assert the two span data
- assert span1["name"] == "span1"
- assert span2["name"] == "span2"
- trace_id = span1["context"]["trace_id"]
- s1_span_id = span1["context"]["span_id"]
- assert span2["context"]["trace_id"] == trace_id
- assert span2["parent_id"] == s1_span_id
- assert span2["attributes"]["attr2"] == "val2"
- assert span2["resource"]["attributes"]["service.name"] ==
"my_test_service"
+ try:
+ assert otel_env_conf.called
+ otel_env_conf.assert_called_once()
+ with tracer.start_span(span_name="span1") as s1:
+ with tracer.start_span(span_name="span2") as s2:
+ s2.set_attribute("attr2", "val2")
+ span2 = json.loads(s2.to_json())
+ span1 = json.loads(s1.to_json())
+ # assert the two span data
+ assert span1["name"] == "span1"
+ assert span2["name"] == "span2"
+ trace_id = span1["context"]["trace_id"]
+ s1_span_id = span1["context"]["span_id"]
+ assert span2["context"]["trace_id"] == trace_id
+ assert span2["parent_id"] == s1_span_id
+ assert span2["attributes"]["attr2"] == "val2"
+ assert span2["resource"]["attributes"]["service.name"] ==
"my_test_service"
+ finally:
+ tracer.shutdown()
@patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
@env_vars(
@@ -117,19 +120,22 @@ class TestOtelTrace:
now = datetime.now()
tracer = otel_tracer.get_otel_tracer(Trace)
- with tracer.start_root_span(span_name="span1", start_time=now) as s1:
- with tracer.start_span(span_name="span2") as s2:
- s2.set_attribute("attr2", "val2")
- span2 = json.loads(s2.to_json())
- span1 = json.loads(s1.to_json())
-
- # The otel sdk, accepts an int for the start_time, and converts it to
an iso string,
- # using `util.ns_to_iso_str()`.
- nano_time = datetime_to_nano(now)
- assert span1["start_time"] == util.ns_to_iso_str(nano_time)
- # Same trace_id
- assert span1["context"]["trace_id"] == span2["context"]["trace_id"]
- assert span1["context"]["span_id"] == span2["parent_id"]
+ try:
+ with tracer.start_root_span(span_name="span1", start_time=now) as
s1:
+ with tracer.start_span(span_name="span2") as s2:
+ s2.set_attribute("attr2", "val2")
+ span2 = json.loads(s2.to_json())
+ span1 = json.loads(s1.to_json())
+
+ # The otel sdk, accepts an int for the start_time, and converts it
to an iso string,
+ # using `util.ns_to_iso_str()`.
+ nano_time = datetime_to_nano(now)
+ assert span1["start_time"] == util.ns_to_iso_str(nano_time)
+ # Same trace_id
+ assert span1["context"]["trace_id"] == span2["context"]["trace_id"]
+ assert span1["context"]["span_id"] == span2["parent_id"]
+ finally:
+ tracer.shutdown()
@patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
@env_vars(
@@ -161,25 +167,27 @@ class TestOtelTrace:
return json_span
tracer = otel_tracer.get_otel_tracer(Trace)
-
- root_span = tracer.start_root_span(span_name="root_span",
start_as_current=False)
- # The context is available, it can be injected into the carrier.
- context_carrier = tracer.inject()
-
- # Some function that uses the carrier to create a new span.
- json_span2 = _task_func(otel_tr=tracer, carrier=context_carrier)
-
- json_span1 = json.loads(root_span.to_json())
- # Manually end the span.
- root_span.end()
-
- # Verify that span1 is a root span.
- assert json_span1["parent_id"] is None
- # Check span2 parent_id to verify that it's a child of span1.
- assert json_span2["parent_id"] == json_span1["context"]["span_id"]
- # The trace_id and the span_id are randomly generated by the otel sdk.
- # Both spans should belong to the same trace.
- assert json_span1["context"]["trace_id"] ==
json_span2["context"]["trace_id"]
+ try:
+ root_span = tracer.start_root_span(span_name="root_span",
start_as_current=False)
+ # The context is available, it can be injected into the carrier.
+ context_carrier = tracer.inject()
+
+ # Some function that uses the carrier to create a new span.
+ json_span2 = _task_func(otel_tr=tracer, carrier=context_carrier)
+
+ json_span1 = json.loads(root_span.to_json())
+ # Manually end the span.
+ root_span.end()
+
+ # Verify that span1 is a root span.
+ assert json_span1["parent_id"] is None
+ # Check span2 parent_id to verify that it's a child of span1.
+ assert json_span2["parent_id"] == json_span1["context"]["span_id"]
+ # The trace_id and the span_id are randomly generated by the otel
sdk.
+ # Both spans should belong to the same trace.
+ assert json_span1["context"]["trace_id"] ==
json_span2["context"]["trace_id"]
+ finally:
+ tracer.shutdown()
@pytest.mark.parametrize(
("provided_env_vars", "expected_endpoint", "expected_exporter_module"),
@@ -250,10 +258,12 @@ class TestOtelTrace:
def test_config_priorities(self, provided_env_vars, expected_endpoint,
expected_exporter_module):
with env_vars(provided_env_vars):
tracer = otel_tracer.get_otel_tracer(Trace)
-
- assert tracer.span_exporter._endpoint == expected_endpoint
-
- assert (
- tracer.span_exporter.__class__.__module__
- ==
f"opentelemetry.exporter.otlp.proto.{expected_exporter_module}.trace_exporter"
- )
+ try:
+ assert tracer.span_exporter._endpoint == expected_endpoint
+
+ assert (
+ tracer.span_exporter.__class__.__module__
+ ==
f"opentelemetry.exporter.otlp.proto.{expected_exporter_module}.trace_exporter"
+ )
+ finally:
+ tracer.shutdown()
diff --git
a/shared/observability/src/airflow_shared/observability/traces/otel_tracer.py
b/shared/observability/src/airflow_shared/observability/traces/otel_tracer.py
index 1e9533fdbe8..40b068a9781 100644
---
a/shared/observability/src/airflow_shared/observability/traces/otel_tracer.py
+++
b/shared/observability/src/airflow_shared/observability/traces/otel_tracer.py
@@ -86,6 +86,9 @@ class OtelTrace:
self.resource = Resource.create(attributes={SERVICE_NAME:
self.otel_service})
self.debug = debug
+ def shutdown(self):
+ self.span_processor.shutdown()
+
def get_otel_tracer_provider(
self,
trace_id: int | None = None,