andreahlert commented on code in PR #56150:
URL: https://github.com/apache/airflow/pull/56150#discussion_r2774273109


##########
shared/observability/src/airflow_shared/observability/traces/otel_tracer.py:
##########
@@ -339,18 +343,70 @@ def get_otel_tracer(
     otel_service: str | None = None,
     debug: bool = False,
 ) -> OtelTrace:
-    """Get OTEL tracer from airflow configuration."""
+    """Get OTEL tracer from the regular OTel env variables or the airflow 
configuration."""
+    otel_env_config = load_traces_env_config()
+
     tag_string = cls.get_constant_tags()
 
     protocol = "https" if ssl_active else "http"
-    # Allow transparent support for standard OpenTelemetry SDK environment 
variables.
-    # 
https://opentelemetry.io/docs/specs/otel/protocol/exporter/#configuration-options
-    endpoint = os.environ.get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", 
f"{protocol}://{host}:{port}/v1/traces")
+    # According to the OpenTelemetry Spec, specific config options like 
'OTEL_EXPORTER_OTLP_TRACES_ENDPOINT'
+    # take precedence over generic ones like 'OTEL_EXPORTER_OTLP_ENDPOINT'.
+    env_exporter_protocol = (
+        otel_env_config.type_specific_exporter_protocol or 
otel_env_config.exporter_protocol
+    )
+    env_endpoint = otel_env_config.type_specific_endpoint or 
otel_env_config.base_endpoint
+
+    # If the protocol env var isn't set, then it will be None,
+    # and it will default to an http/protobuf exporter.
+    if env_endpoint and env_exporter_protocol == "grpc":
+        from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import 
OTLPSpanExporter
+    else:
+        from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
+
+    if env_endpoint:
+        if host is not None and port is not None:
+            log.warning(
+                "Both the standard OpenTelemetry environment variables and "
+                "the Airflow OpenTelemetry configs have been provided. "
+                "Using the OpenTelemetry environment variables. "
+                "The Airflow configs have been deprecated and will be removed 
in the future."
+            )
+
+        endpoint = env_endpoint
+        # The SDK will pick up all the values from the environment.
+        exporter = OTLPSpanExporter()
+    else:
+        if host is not None and port is not None:
+            # Since the configs have been deprecated, host and port could be 
None.
+            # Log a warning to steer the user towards configuring the 
environment variables
+            # and deliberately let it fail here without providing fallbacks.
+            log.warning(
+                "OpenTelemetry has unset config settings. "
+                "The Airflow configs have been deprecated and will be removed 
in the future. "
+                "Configure the standard OpenTelemetry environment variables 
instead. "
+                "For more info, check the docs."
+            )
+        else:
+            log.warning(
+                "The Airflow OpenTelemetry configs have been deprecated and 
will be removed in the future. "
+                "OpenTelemetry is advised to be configured using the standard 
environment variables. "
+                "For more info, check the docs."
+            )
+        # If the environment endpoint isn't set, then assume that the airflow 
config is used
+        # where protocol isn't specified, and it's always http/protobuf.
+        # In that case it should default to the full 'url_path' and set it 
directly.
+        endpoint = f"{protocol}://{host}:{port}/v1/traces"

Review Comment:
   After #61289 removes the defaults for deprecated configs, if someone has 
`otel_on=True` but no env vars set, `host` and `port` will both be `None` here. 
This ends up building `http://None:None/v1/traces` which gives a pretty cryptic 
connection error.
   
   Probably worth raising an explicit error before the f-string:
   
   ```python
   if host is None or port is None:
       raise AirflowConfigException(
           "OTel tracing is enabled but no endpoint is configured. "
           "Set OTEL_EXPORTER_OTLP_ENDPOINT or 
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT."
       )
   ```
   
   Same issue in `get_metric_exporter`.



##########
airflow-core/tests/unit/observability/traces/test_otel_tracer.py:
##########
@@ -82,121 +84,185 @@ def test_debug_trace_metaclass(self):
         assert isinstance(DebugTrace.factory(), EmptyTrace)
 
     @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
-    @patch("airflow.observability.traces.otel_tracer.conf")
-    def test_tracer(self, conf_a, exporter):
-        # necessary to speed up the span to be emitted
-        with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
-            log = logging.getLogger("TestOtelTrace.test_tracer")
-            log.setLevel(logging.DEBUG)
-            # hijacking airflow conf with pre-defined
-            # values
-            conf_a.get.return_value = "abc"
-            conf_a.getint.return_value = 123
-            # this will enable debug to set - which outputs the result to 
console
-            conf_a.getboolean.return_value = True
-
-            # mocking console exporter with in mem exporter for better 
assertion
-            in_mem_exporter = InMemorySpanExporter()
-            exporter.return_value = in_mem_exporter
+    @patch("airflow._shared.observability.otel_env_config.OtelEnvConfig")
+    @env_vars(
+        {
+            "OTEL_SERVICE_NAME": "my_test_service",
+            # necessary to speed up the span to be emitted
+            "OTEL_BSP_SCHEDULE_DELAY": "1",
+        }
+    )
+    def test_tracer(self, otel_env_conf, exporter):
+        log = logging.getLogger("TestOtelTrace.test_tracer")
+        log.setLevel(logging.DEBUG)
 
-            tracer = otel_tracer.get_otel_tracer(Trace)
-            assert conf_a.get.called
-            assert conf_a.getint.called
-            assert conf_a.getboolean.called
-            with tracer.start_span(span_name="span1") as s1:
-                with tracer.start_span(span_name="span2") as s2:
-                    s2.set_attribute("attr2", "val2")
-                    span2 = json.loads(s2.to_json())
-                span1 = json.loads(s1.to_json())
-            # assert the two span data
-            assert span1["name"] == "span1"
-            assert span2["name"] == "span2"
-            trace_id = span1["context"]["trace_id"]
-            s1_span_id = span1["context"]["span_id"]
-            assert span2["context"]["trace_id"] == trace_id
-            assert span2["parent_id"] == s1_span_id
-            assert span2["attributes"]["attr2"] == "val2"
-            assert span2["resource"]["attributes"]["service.name"] == "abc"
+        # mocking console exporter with in mem exporter for better assertion
+        in_mem_exporter = InMemorySpanExporter()
+        exporter.return_value = in_mem_exporter
+
+        tracer = otel_tracer.get_otel_tracer(Trace)
+        assert otel_env_conf.called
+        otel_env_conf.assert_called_once()
+        with tracer.start_span(span_name="span1") as s1:
+            with tracer.start_span(span_name="span2") as s2:
+                s2.set_attribute("attr2", "val2")
+                span2 = json.loads(s2.to_json())
+            span1 = json.loads(s1.to_json())
+        # assert the two span data
+        assert span1["name"] == "span1"
+        assert span2["name"] == "span2"
+        trace_id = span1["context"]["trace_id"]
+        s1_span_id = span1["context"]["span_id"]
+        assert span2["context"]["trace_id"] == trace_id
+        assert span2["parent_id"] == s1_span_id
+        assert span2["attributes"]["attr2"] == "val2"
+        assert span2["resource"]["attributes"]["service.name"] == 
"my_test_service"
 
     @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
-    @patch("airflow.observability.traces.otel_tracer.conf")
-    def test_dag_tracer(self, conf_a, exporter):
-        # necessary to speed up the span to be emitted
-        with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
-            log = logging.getLogger("TestOtelTrace.test_dag_tracer")
-            log.setLevel(logging.DEBUG)
-            conf_a.get.return_value = "abc"
-            conf_a.getint.return_value = 123
-            # this will enable debug to set - which outputs the result to 
console
-            conf_a.getboolean.return_value = True
-
-            # mocking console exporter with in mem exporter for better 
assertion
-            in_mem_exporter = InMemorySpanExporter()
-            exporter.return_value = in_mem_exporter
-
-            now = datetime.now()
+    @env_vars(
+        {
+            "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318";,
+            # necessary to speed up the span to be emitted
+            "OTEL_BSP_SCHEDULE_DELAY": "1",
+        }
+    )
+    def test_dag_tracer(self, exporter):
+        log = logging.getLogger("TestOtelTrace.test_dag_tracer")
+        log.setLevel(logging.DEBUG)
 
-            tracer = otel_tracer.get_otel_tracer(Trace)
-            with tracer.start_root_span(span_name="span1", start_time=now) as 
s1:
-                with tracer.start_span(span_name="span2") as s2:
-                    s2.set_attribute("attr2", "val2")
-                    span2 = json.loads(s2.to_json())
-                span1 = json.loads(s1.to_json())
-
-            # The otel sdk, accepts an int for the start_time, and converts it 
to an iso string,
-            # using `util.ns_to_iso_str()`.
-            nano_time = datetime_to_nano(now)
-            assert span1["start_time"] == util.ns_to_iso_str(nano_time)
-            # Same trace_id
-            assert span1["context"]["trace_id"] == span2["context"]["trace_id"]
-            assert span1["context"]["span_id"] == span2["parent_id"]
+        # mocking console exporter with in mem exporter for better assertion
+        in_mem_exporter = InMemorySpanExporter()
+        exporter.return_value = in_mem_exporter
 
-    @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
-    @patch("airflow.observability.traces.otel_tracer.conf")
-    def test_context_propagation(self, conf_a, exporter):
-        # necessary to speed up the span to be emitted
-        with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
-            log = logging.getLogger("TestOtelTrace.test_context_propagation")
-            log.setLevel(logging.DEBUG)
-            conf_a.get.return_value = "abc"
-            conf_a.getint.return_value = 123
-            # this will enable debug to set - which outputs the result to 
console
-            conf_a.getboolean.return_value = True
-
-            # mocking console exporter with in mem exporter for better 
assertion
-            in_mem_exporter = InMemorySpanExporter()
-            exporter.return_value = in_mem_exporter
-
-            # Method that represents another service which is
-            #  - getting the carrier
-            #  - extracting the context
-            #  - using the context to create a new span
-            # The new span should be associated with the span from the 
injected context carrier.
-            def _task_func(otel_tr, carrier):
-                parent_context = otel_tr.extract(carrier)
-
-                with otel_tr.start_child_span(span_name="sub_span", 
parent_context=parent_context) as span:
-                    span.set_attribute("attr2", "val2")
-                    json_span = json.loads(span.to_json())
-                return json_span
+        now = datetime.now()
 
-            tracer = otel_tracer.get_otel_tracer(Trace)
+        tracer = otel_tracer.get_otel_tracer(Trace)
+        with tracer.start_root_span(span_name="span1", start_time=now) as s1:
+            with tracer.start_span(span_name="span2") as s2:
+                s2.set_attribute("attr2", "val2")
+                span2 = json.loads(s2.to_json())
+            span1 = json.loads(s1.to_json())
+
+        # The otel sdk, accepts an int for the start_time, and converts it to 
an iso string,
+        # using `util.ns_to_iso_str()`.
+        nano_time = datetime_to_nano(now)
+        assert span1["start_time"] == util.ns_to_iso_str(nano_time)
+        # Same trace_id
+        assert span1["context"]["trace_id"] == span2["context"]["trace_id"]
+        assert span1["context"]["span_id"] == span2["parent_id"]
+
+    @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
+    @env_vars(
+        {
+            "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318";,
+            # necessary to speed up the span to be emitted
+            "OTEL_BSP_SCHEDULE_DELAY": "1",
+        }
+    )
+    def test_context_propagation(self, exporter):
+        log = logging.getLogger("TestOtelTrace.test_context_propagation")
+        log.setLevel(logging.DEBUG)
+
+        # mocking console exporter with in mem exporter for better assertion
+        in_mem_exporter = InMemorySpanExporter()
+        exporter.return_value = in_mem_exporter
+
+        # Method that represents another service which is
+        #  - getting the carrier
+        #  - extracting the context
+        #  - using the context to create a new span
+        # The new span should be associated with the span from the injected 
context carrier.
+        def _task_func(otel_tr, carrier):
+            parent_context = otel_tr.extract(carrier)
+
+            with otel_tr.start_child_span(span_name="sub_span", 
parent_context=parent_context) as span:
+                span.set_attribute("attr2", "val2")
+                json_span = json.loads(span.to_json())
+            return json_span
 
-            root_span = tracer.start_root_span(span_name="root_span", 
start_as_current=False)
-            # The context is available, it can be injected into the carrier.
-            context_carrier = tracer.inject()
+        tracer = otel_tracer.get_otel_tracer(Trace)
 
-            # Some function that uses the carrier to create a new span.
-            json_span2 = _task_func(otel_tr=tracer, carrier=context_carrier)
+        root_span = tracer.start_root_span(span_name="root_span", 
start_as_current=False)
+        # The context is available, it can be injected into the carrier.
+        context_carrier = tracer.inject()
+
+        # Some function that uses the carrier to create a new span.
+        json_span2 = _task_func(otel_tr=tracer, carrier=context_carrier)
+
+        json_span1 = json.loads(root_span.to_json())
+        # Manually end the span.
+        root_span.end()
+
+        # Verify that span1 is a root span.
+        assert json_span1["parent_id"] is None
+        # Check span2 parent_id to verify that it's a child of span1.
+        assert json_span2["parent_id"] == json_span1["context"]["span_id"]
+        # The trace_id and the span_id are randomly generated by the otel sdk.
+        # Both spans should belong to the same trace.
+        assert json_span1["context"]["trace_id"] == 
json_span2["context"]["trace_id"]
+
+    @pytest.mark.parametrize(

Review Comment:
   None of the test cases cover `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL` overriding 
the generic `OTEL_EXPORTER_OTLP_PROTOCOL`. Might be worth adding something like:
   
   ```python
   pytest.param(
       {
           "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:1234";,
           "OTEL_EXPORTER_OTLP_PROTOCOL": "http/protobuf",
           "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL": "grpc",
       },
       "localhost:1234",
       "grpc",
       id="type_specific_protocol_takes_precedence",
   ),
   ```
   
   Same for the metrics side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to