This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 63ea296047d Use the regular OTel environment variables for configuring
traces and metrics (#56150)
63ea296047d is described below
commit 63ea296047d7e31b5c929043b11929c1484163d6
Author: Christos Bisias <[email protected]>
AuthorDate: Tue Feb 17 03:48:37 2026 +0200
Use the regular OTel environment variables for configuring traces and
metrics (#56150)
* support regular OTel env variables
* fix spellcheck errors in the docs
* trigger the CI again
* change formatting for deprecated list
* update docs
* fix traces + add warning when both config methods are used
* fix metrics + add warning when both config methods are used
* fix integration test_otel
* fix mypy-airflow-core check error
* fix trces config typo
* remove quotes from integration-otel config
* add deprecation warning for the airflow settings
* refactor exporter init logic to a common function
* add more test cases + fix metrics exporter issue
* fix import in test_otel_logger
* make missing host/port warning more explicit
* remove conf import from shared + use correct interval conversion
* fix imports in test_otel_logger
* add a fallback for the service_name
---
.../logging-monitoring/metrics.rst | 16 +-
.../logging-monitoring/traces.rst | 14 +-
.../src/airflow/config_templates/config.yml | 110 ++++++++
.../airflow/observability/metrics/otel_logger.py | 27 +-
.../airflow/observability/traces/otel_tracer.py | 17 +-
airflow-core/tests/integration/otel/test_otel.py | 13 +-
.../unit/observability/traces/test_otel_tracer.py | 307 +++++++++++++--------
scripts/ci/docker-compose/integration-otel.yml | 13 +-
.../src/airflow_shared/observability/common.py | 109 ++++++++
.../observability/metrics/otel_logger.py | 39 +--
.../observability/otel_env_config.py | 114 ++++++++
.../observability/traces/otel_tracer.py | 45 +--
.../observability/metrics/test_otel_logger.py | 103 +++++++
.../sdk/observability/metrics/otel_logger.py | 27 +-
.../sdk/observability/traces/otel_tracer.py | 17 +-
15 files changed, 780 insertions(+), 191 deletions(-)
diff --git
a/airflow-core/docs/administration-and-deployment/logging-monitoring/metrics.rst
b/airflow-core/docs/administration-and-deployment/logging-monitoring/metrics.rst
index 3322f200498..49f37ab1820 100644
---
a/airflow-core/docs/administration-and-deployment/logging-monitoring/metrics.rst
+++
b/airflow-core/docs/administration-and-deployment/logging-monitoring/metrics.rst
@@ -79,10 +79,20 @@ Add the Collector details to your configuration file e.g.
``airflow.cfg``
.. note::
- To support the OpenTelemetry exporter standard, the ``metrics``
configurations are transparently overridden by use of standard OpenTelemetry
SDK environment variables.
+ **The following config keys have been deprecated and will be removed in
the future**
- - ``OTEL_EXPORTER_OTLP_ENDPOINT`` and
``OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`` supersede ``otel_host``, ``otel_port``
and ``otel_ssl_active``
- - ``OTEL_METRIC_EXPORT_INTERVAL`` supersedes ``otel_interval_milliseconds``
+ .. code-block:: ini
+
+ [metrics]
+ otel_host = localhost
+ otel_port = 8889
+ otel_interval_milliseconds = 30000
+ otel_debugging_on = False
+ otel_service = Airflow
+ otel_ssl_active = False
+
+ The OpenTelemetry SDK should be configured using standard OpenTelemetry
environment variables
+ such as ``OTEL_EXPORTER_OTLP_ENDPOINT``, ``OTEL_EXPORTER_OTLP_PROTOCOL``,
etc.
See the OpenTelemetry `exporter protocol specification
<https://opentelemetry.io/docs/specs/otel/protocol/exporter/#configuration-options>`_
and
`SDK environment variable documentation
<https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#periodic-exporting-metricreader>`_
for more information.
diff --git
a/airflow-core/docs/administration-and-deployment/logging-monitoring/traces.rst
b/airflow-core/docs/administration-and-deployment/logging-monitoring/traces.rst
index c0b3ec61345..a437ea90061 100644
---
a/airflow-core/docs/administration-and-deployment/logging-monitoring/traces.rst
+++
b/airflow-core/docs/administration-and-deployment/logging-monitoring/traces.rst
@@ -45,9 +45,19 @@ Add the following lines to your configuration file e.g.
``airflow.cfg``
.. note::
- To support the OpenTelemetry exporter standard, the ``traces``
configurations are transparently superseded by use of standard OpenTelemetry
SDK environment variables.
+ **The following config keys have been deprecated and will be removed in
the future**
- - ``OTEL_EXPORTER_OTLP_ENDPOINT`` and
``OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`` overridden ``otel_host``, ``otel_port``
and ``otel_ssl_active``
+ .. code-block:: ini
+
+ [traces]
+ otel_host = localhost
+ otel_port = 8889
+ otel_debugging_on = False
+ otel_service = Airflow
+ otel_ssl_active = False
+
+ The OpenTelemetry SDK should be configured using standard OpenTelemetry
environment variables
+ such as ``OTEL_EXPORTER_OTLP_ENDPOINT``, ``OTEL_EXPORTER_OTLP_PROTOCOL``,
etc.
See the OpenTelemetry `exporter protocol specification
<https://opentelemetry.io/docs/specs/otel/protocol/exporter/#configuration-options>`_
and
`SDK environment variable documentation
<https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#periodic-exporting-metricreader>`_
for more information.
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index e754ba77751..3daed0edbaf 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1197,6 +1197,16 @@ metrics:
Specifies the hostname or IP address of the OpenTelemetry Collector to
which Airflow sends
metrics and traces.
version_added: 2.6.0
+ version_deprecated: 3.2.0
+ deprecation_reason: |
+ According to the OpenTelemetry specification, configuration is
expected to happen through
+ the standard OpenTelemetry environment variables rather than
project-specific settings.
+
+ This option has been deprecated to ensure consistent behavior across
different environments
+ and deployments that use OpenTelemetry.
+
+ OpenTelemetry should be configured exclusively using the standard
OpenTelemetry environment variables
+ such as 'OTEL_EXPORTER_OTLP_ENDPOINT', 'OTEL_EXPORTER_OTLP_PROTOCOL',
'OTEL_SERVICE_NAME', etc.
type: string
example: ~
default: "localhost"
@@ -1204,6 +1214,16 @@ metrics:
description: |
Specifies the port of the OpenTelemetry Collector that is listening to.
version_added: 2.6.0
+ version_deprecated: 3.2.0
+ deprecation_reason: |
+ According to the OpenTelemetry specification, configuration is
expected to happen through
+ the standard OpenTelemetry environment variables rather than
project-specific settings.
+
+ This option has been deprecated to ensure consistent behavior across
different environments
+ and deployments that use OpenTelemetry.
+
+ OpenTelemetry should be configured exclusively using the standard
OpenTelemetry environment variables
+ such as 'OTEL_EXPORTER_OTLP_ENDPOINT', 'OTEL_EXPORTER_OTLP_PROTOCOL',
'OTEL_SERVICE_NAME', etc.
type: integer
example: ~
default: "8889"
@@ -1219,6 +1239,16 @@ metrics:
Defines the interval, in milliseconds, at which Airflow sends batches
of metrics and traces
to the configured OpenTelemetry Collector.
version_added: 2.6.0
+ version_deprecated: 3.2.0
+ deprecation_reason: |
+ According to the OpenTelemetry specification, configuration is
expected to happen through
+ the standard OpenTelemetry environment variables rather than
project-specific settings.
+
+ This option has been deprecated to ensure consistent behavior across
different environments
+ and deployments that use OpenTelemetry.
+
+ OpenTelemetry should be configured exclusively using the standard
OpenTelemetry environment variables
+ such as 'OTEL_EXPORTER_OTLP_ENDPOINT', 'OTEL_EXPORTER_OTLP_PROTOCOL',
'OTEL_SERVICE_NAME', etc.
type: integer
example: ~
default: "60000"
@@ -1226,6 +1256,16 @@ metrics:
description: |
If ``True``, all metrics are also emitted to the console. Defaults to
``False``.
version_added: 2.7.0
+ version_deprecated: 3.2.0
+ deprecation_reason: |
+ According to the OpenTelemetry specification, configuration is
expected to happen through
+ the standard OpenTelemetry environment variables rather than
project-specific settings.
+
+ This option has been deprecated to ensure consistent behavior across
different environments
+ and deployments that use OpenTelemetry.
+
+ OpenTelemetry should be configured exclusively using the standard
OpenTelemetry environment variables
+ such as 'OTEL_EXPORTER_OTLP_ENDPOINT', 'OTEL_EXPORTER_OTLP_PROTOCOL',
'OTEL_SERVICE_NAME', etc.
type: boolean
example: ~
default: "False"
@@ -1233,6 +1273,16 @@ metrics:
description: |
The default service name of traces.
version_added: 2.10.3
+ version_deprecated: 3.2.0
+ deprecation_reason: |
+ According to the OpenTelemetry specification, configuration is
expected to happen through
+ the standard OpenTelemetry environment variables rather than
project-specific settings.
+
+ This option has been deprecated to ensure consistent behavior across
different environments
+ and deployments that use OpenTelemetry.
+
+ OpenTelemetry should be configured exclusively using the standard
OpenTelemetry environment variables
+ such as 'OTEL_EXPORTER_OTLP_ENDPOINT', 'OTEL_EXPORTER_OTLP_PROTOCOL',
'OTEL_SERVICE_NAME', etc.
type: string
example: ~
default: "Airflow"
@@ -1243,6 +1293,16 @@ metrics:
you need to configure the SSL certificate and key within the
OpenTelemetry collector's
``config.yml`` file.
version_added: 2.7.0
+ version_deprecated: 3.2.0
+ deprecation_reason: |
+ According to the OpenTelemetry specification, configuration is
expected to happen through
+ the standard OpenTelemetry environment variables rather than
project-specific settings.
+
+ This option has been deprecated to ensure consistent behavior across
different environments
+ and deployments that use OpenTelemetry.
+
+ OpenTelemetry should be configured exclusively using the standard
OpenTelemetry environment variables
+ such as 'OTEL_EXPORTER_OTLP_ENDPOINT', 'OTEL_EXPORTER_OTLP_PROTOCOL',
'OTEL_SERVICE_NAME', etc.
type: boolean
example: ~
default: "False"
@@ -1262,6 +1322,16 @@ traces:
Specifies the hostname or IP address of the OpenTelemetry Collector to
which Airflow sends
traces.
version_added: 2.10.0
+ version_deprecated: 3.2.0
+ deprecation_reason: |
+ According to the OpenTelemetry specification, configuration is
expected to happen through
+ the standard OpenTelemetry environment variables rather than
project-specific settings.
+
+ This option has been deprecated to ensure consistent behavior across
different environments
+ and deployments that use OpenTelemetry.
+
+ OpenTelemetry should be configured exclusively using the standard
OpenTelemetry environment variables
+ such as 'OTEL_EXPORTER_OTLP_ENDPOINT', 'OTEL_EXPORTER_OTLP_PROTOCOL',
'OTEL_SERVICE_NAME', etc.
type: string
example: ~
default: "localhost"
@@ -1269,6 +1339,16 @@ traces:
description: |
Specifies the port of the OpenTelemetry Collector that is listening to.
version_added: 2.10.0
+ version_deprecated: 3.2.0
+ deprecation_reason: |
+ According to the OpenTelemetry specification, configuration is
expected to happen through
+ the standard OpenTelemetry environment variables rather than
project-specific settings.
+
+ This option has been deprecated to ensure consistent behavior across
different environments
+ and deployments that use OpenTelemetry.
+
+ OpenTelemetry should be configured exclusively using the standard
OpenTelemetry environment variables
+ such as 'OTEL_EXPORTER_OTLP_ENDPOINT', 'OTEL_EXPORTER_OTLP_PROTOCOL',
'OTEL_SERVICE_NAME', etc.
type: integer
example: ~
default: "8889"
@@ -1276,6 +1356,16 @@ traces:
description: |
The default service name of traces.
version_added: 2.10.0
+ version_deprecated: 3.2.0
+ deprecation_reason: |
+ According to the OpenTelemetry specification, configuration is
expected to happen through
+ the standard OpenTelemetry environment variables rather than
project-specific settings.
+
+ This option has been deprecated to ensure consistent behavior across
different environments
+ and deployments that use OpenTelemetry.
+
+ OpenTelemetry should be configured exclusively using the standard
OpenTelemetry environment variables
+ such as 'OTEL_EXPORTER_OTLP_ENDPOINT', 'OTEL_EXPORTER_OTLP_PROTOCOL',
'OTEL_SERVICE_NAME', etc.
type: string
example: ~
default: "Airflow"
@@ -1283,6 +1373,16 @@ traces:
description: |
If True, all traces are also emitted to the console. Defaults to False.
version_added: 2.10.0
+ version_deprecated: 3.2.0
+ deprecation_reason: |
+ According to the OpenTelemetry specification, configuration is
expected to happen through
+ the standard OpenTelemetry environment variables rather than
project-specific settings.
+
+ This option has been deprecated to ensure consistent behavior across
different environments
+ and deployments that use OpenTelemetry.
+
+ OpenTelemetry should be configured exclusively using the standard
OpenTelemetry environment variables
+ such as 'OTEL_EXPORTER_OTLP_ENDPOINT', 'OTEL_EXPORTER_OTLP_PROTOCOL',
'OTEL_SERVICE_NAME', etc.
type: boolean
example: ~
default: "False"
@@ -1293,6 +1393,16 @@ traces:
you need to configure the SSL certificate and key within the
OpenTelemetry collector's
config.yml file.
version_added: 2.10.0
+ version_deprecated: 3.2.0
+ deprecation_reason: |
+ According to the OpenTelemetry specification, configuration is
expected to happen through
+ the standard OpenTelemetry environment variables rather than
project-specific settings.
+
+ This option has been deprecated to ensure consistent behavior across
different environments
+ and deployments that use OpenTelemetry.
+
+ OpenTelemetry should be configured exclusively using the standard
OpenTelemetry environment variables
+ such as 'OTEL_EXPORTER_OTLP_ENDPOINT', 'OTEL_EXPORTER_OTLP_PROTOCOL',
'OTEL_SERVICE_NAME', etc.
type: boolean
example: ~
default: "False"
diff --git a/airflow-core/src/airflow/observability/metrics/otel_logger.py
b/airflow-core/src/airflow/observability/metrics/otel_logger.py
index 1fb73423cba..4e953df468d 100644
--- a/airflow-core/src/airflow/observability/metrics/otel_logger.py
+++ b/airflow-core/src/airflow/observability/metrics/otel_logger.py
@@ -26,17 +26,28 @@ if TYPE_CHECKING:
def get_otel_logger() -> SafeOtelLogger:
+ # The config values have been deprecated and therefore,
+ # if the user hasn't added them to the config, the default values won't be
used.
+ # A fallback is needed to avoid an exception.
+ port = None
+ if conf.has_option("metrics", "otel_port"):
+ port = conf.getint("metrics", "otel_port")
+
+ conf_interval = None
+ if conf.has_option("metrics", "otel_interval_milliseconds"):
+ conf_interval = conf.getfloat("metrics", "otel_interval_milliseconds")
+
return otel_logger.get_otel_logger(
- host=conf.get("metrics", "otel_host"), # ex: "breeze-otel-collector"
- port=conf.getint("metrics", "otel_port"), # ex: 4318
- prefix=conf.get("metrics", "otel_prefix"), # ex: "airflow"
- ssl_active=conf.getboolean("metrics", "otel_ssl_active"),
+ host=conf.get("metrics", "otel_host", fallback=None), # ex:
"breeze-otel-collector"
+ port=port, # ex: 4318
+ prefix=conf.get("metrics", "otel_prefix", fallback=None), # ex:
"airflow"
+ ssl_active=conf.getboolean("metrics", "otel_ssl_active",
fallback=False),
# PeriodicExportingMetricReader will default to an interval of 60000
millis.
- conf_interval=conf.getfloat("metrics", "otel_interval_milliseconds",
fallback=None), # ex: 30000
- debug=conf.getboolean("metrics", "otel_debugging_on"),
- service_name=conf.get("metrics", "otel_service"),
+ conf_interval=conf_interval, # ex: 30000
+ debug=conf.getboolean("metrics", "otel_debugging_on", fallback=False),
+ service_name=conf.get("metrics", "otel_service", fallback=None),
metrics_allow_list=conf.get("metrics", "metrics_allow_list",
fallback=None),
metrics_block_list=conf.get("metrics", "metrics_block_list",
fallback=None),
- stat_name_handler=conf.getimport("metrics", "stat_name_handler"),
+ stat_name_handler=conf.getimport("metrics", "stat_name_handler",
fallback=None),
statsd_influxdb_enabled=conf.getboolean("metrics",
"statsd_influxdb_enabled", fallback=False),
)
diff --git a/airflow-core/src/airflow/observability/traces/otel_tracer.py
b/airflow-core/src/airflow/observability/traces/otel_tracer.py
index 73934c3da32..8e19f4c6983 100644
--- a/airflow-core/src/airflow/observability/traces/otel_tracer.py
+++ b/airflow-core/src/airflow/observability/traces/otel_tracer.py
@@ -27,14 +27,21 @@ if TYPE_CHECKING:
def get_otel_tracer(cls, use_simple_processor: bool = False) -> OtelTrace:
+ # The config values have been deprecated and therefore,
+ # if the user hasn't added them to the config, the default values won't be
used.
+ # A fallback is needed to avoid an exception.
+ port = None
+ if conf.has_option("traces", "otel_port"):
+ port = conf.getint("traces", "otel_port")
+
return otel_tracer.get_otel_tracer(
cls,
use_simple_processor,
- host=conf.get("traces", "otel_host"),
- port=conf.getint("traces", "otel_port"),
- ssl_active=conf.getboolean("traces", "otel_ssl_active"),
- otel_service=conf.get("traces", "otel_service"),
- debug=conf.getboolean("traces", "otel_debugging_on"),
+ host=conf.get("traces", "otel_host", fallback=None),
+ port=port,
+ ssl_active=conf.getboolean("traces", "otel_ssl_active",
fallback=False),
+ otel_service=conf.get("traces", "otel_service", fallback=None),
+ debug=conf.getboolean("traces", "otel_debugging_on", fallback=False),
)
diff --git a/airflow-core/tests/integration/otel/test_otel.py
b/airflow-core/tests/integration/otel/test_otel.py
index fa3c5b358e3..e5ad8181a17 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -704,10 +704,10 @@ class TestOtelIntegration:
wait_for_otel_collector(otel_host, otel_port)
os.environ["AIRFLOW__TRACES__OTEL_ON"] = "True"
- os.environ["AIRFLOW__TRACES__OTEL_HOST"] = otel_host
- os.environ["AIRFLOW__TRACES__OTEL_PORT"] = str(otel_port)
+ os.environ["OTEL_EXPORTER_OTLP_PROTOCOL"] = "http/protobuf"
+ os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] =
"http://breeze-otel-collector:4318/v1/traces"
if cls.use_otel != "true":
- os.environ["AIRFLOW__TRACES__OTEL_DEBUGGING_ON"] = "True"
+ os.environ["OTEL_TRACES_EXPORTER"] = "console"
os.environ["AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR"] = "False"
os.environ["AIRFLOW__SCHEDULER__PROCESSOR_POLL_INTERVAL"] = "2"
@@ -824,12 +824,11 @@ class TestOtelIntegration:
def dag_execution_for_testing_metrics(self, capfd):
# Metrics.
os.environ["AIRFLOW__METRICS__OTEL_ON"] = "True"
- os.environ["AIRFLOW__METRICS__OTEL_HOST"] = "breeze-otel-collector"
- os.environ["AIRFLOW__METRICS__OTEL_PORT"] = "4318"
- os.environ["AIRFLOW__METRICS__OTEL_INTERVAL_MILLISECONDS"] = "5000"
+ os.environ["OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"] =
"http://breeze-otel-collector:4318/v1/metrics"
+ os.environ["OTEL_METRIC_EXPORT_INTERVAL"] = "5000"
if self.use_otel != "true":
- os.environ["AIRFLOW__METRICS__OTEL_DEBUGGING_ON"] = "True"
+ os.environ["OTEL_METRICS_EXPORTER"] = "console"
celery_worker_process = None
scheduler_process = None
diff --git a/airflow-core/tests/unit/observability/traces/test_otel_tracer.py
b/airflow-core/tests/unit/observability/traces/test_otel_tracer.py
index ec124e1a24e..3b6207aefe7 100644
--- a/airflow-core/tests/unit/observability/traces/test_otel_tracer.py
+++ b/airflow-core/tests/unit/observability/traces/test_otel_tracer.py
@@ -31,7 +31,7 @@ from airflow._shared.observability.traces.utils import
datetime_to_nano
from airflow.observability.trace import DebugTrace, Trace
from airflow.observability.traces import otel_tracer
-from tests_common.test_utils.config import conf_vars, env_vars
+from tests_common.test_utils.config import env_vars
@pytest.fixture
@@ -40,10 +40,11 @@ def name():
class TestOtelTrace:
- @conf_vars(
+ @env_vars(
{
- ("traces", "otel_on"): "True",
- ("traces", "otel_debugging_on"): "True",
+ "AIRFLOW__TRACES__OTEL_ON": "True",
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318",
+ "OTEL_TRACES_EXPORTER": "console",
}
)
def test_get_otel_tracer_from_trace_metaclass(self):
@@ -62,10 +63,11 @@ class TestOtelTrace:
task_tracer.get_otel_tracer_provider()
assert task_tracer.use_simple_processor is True
- @conf_vars(
+ @env_vars(
{
- ("traces", "otel_on"): "True",
- ("traces", "otel_debug_traces_on"): "False",
+ "AIRFLOW__TRACES__OTEL_ON": "True",
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318",
+ "OTEL_TRACES_EXPORTER": "otlp",
}
)
def test_debug_trace_metaclass(self):
@@ -82,121 +84,196 @@ class TestOtelTrace:
assert isinstance(DebugTrace.factory(), EmptyTrace)
@patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
- @patch("airflow.observability.traces.otel_tracer.conf")
- def test_tracer(self, conf_a, exporter):
- # necessary to speed up the span to be emitted
- with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
- log = logging.getLogger("TestOtelTrace.test_tracer")
- log.setLevel(logging.DEBUG)
- # hijacking airflow conf with pre-defined
- # values
- conf_a.get.return_value = "abc"
- conf_a.getint.return_value = 123
- # this will enable debug to set - which outputs the result to
console
- conf_a.getboolean.return_value = True
-
- # mocking console exporter with in mem exporter for better
assertion
- in_mem_exporter = InMemorySpanExporter()
- exporter.return_value = in_mem_exporter
+ @patch("airflow._shared.observability.otel_env_config.OtelEnvConfig")
+ @env_vars(
+ {
+ "OTEL_SERVICE_NAME": "my_test_service",
+ # necessary to speed up the span to be emitted
+ "OTEL_BSP_SCHEDULE_DELAY": "1",
+ }
+ )
+ def test_tracer(self, otel_env_conf, exporter):
+ log = logging.getLogger("TestOtelTrace.test_tracer")
+ log.setLevel(logging.DEBUG)
- tracer = otel_tracer.get_otel_tracer(Trace)
- assert conf_a.get.called
- assert conf_a.getint.called
- assert conf_a.getboolean.called
- with tracer.start_span(span_name="span1") as s1:
- with tracer.start_span(span_name="span2") as s2:
- s2.set_attribute("attr2", "val2")
- span2 = json.loads(s2.to_json())
- span1 = json.loads(s1.to_json())
- # assert the two span data
- assert span1["name"] == "span1"
- assert span2["name"] == "span2"
- trace_id = span1["context"]["trace_id"]
- s1_span_id = span1["context"]["span_id"]
- assert span2["context"]["trace_id"] == trace_id
- assert span2["parent_id"] == s1_span_id
- assert span2["attributes"]["attr2"] == "val2"
- assert span2["resource"]["attributes"]["service.name"] == "abc"
+ # mocking console exporter with in mem exporter for better assertion
+ in_mem_exporter = InMemorySpanExporter()
+ exporter.return_value = in_mem_exporter
+
+ tracer = otel_tracer.get_otel_tracer(Trace)
+ assert otel_env_conf.called
+ otel_env_conf.assert_called_once()
+ with tracer.start_span(span_name="span1") as s1:
+ with tracer.start_span(span_name="span2") as s2:
+ s2.set_attribute("attr2", "val2")
+ span2 = json.loads(s2.to_json())
+ span1 = json.loads(s1.to_json())
+ # assert the two span data
+ assert span1["name"] == "span1"
+ assert span2["name"] == "span2"
+ trace_id = span1["context"]["trace_id"]
+ s1_span_id = span1["context"]["span_id"]
+ assert span2["context"]["trace_id"] == trace_id
+ assert span2["parent_id"] == s1_span_id
+ assert span2["attributes"]["attr2"] == "val2"
+ assert span2["resource"]["attributes"]["service.name"] ==
"my_test_service"
@patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
- @patch("airflow.observability.traces.otel_tracer.conf")
- def test_dag_tracer(self, conf_a, exporter):
- # necessary to speed up the span to be emitted
- with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
- log = logging.getLogger("TestOtelTrace.test_dag_tracer")
- log.setLevel(logging.DEBUG)
- conf_a.get.return_value = "abc"
- conf_a.getint.return_value = 123
- # this will enable debug to set - which outputs the result to
console
- conf_a.getboolean.return_value = True
-
- # mocking console exporter with in mem exporter for better
assertion
- in_mem_exporter = InMemorySpanExporter()
- exporter.return_value = in_mem_exporter
-
- now = datetime.now()
+ @env_vars(
+ {
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318",
+ # necessary to speed up the span to be emitted
+ "OTEL_BSP_SCHEDULE_DELAY": "1",
+ }
+ )
+ def test_dag_tracer(self, exporter):
+ log = logging.getLogger("TestOtelTrace.test_dag_tracer")
+ log.setLevel(logging.DEBUG)
- tracer = otel_tracer.get_otel_tracer(Trace)
- with tracer.start_root_span(span_name="span1", start_time=now) as
s1:
- with tracer.start_span(span_name="span2") as s2:
- s2.set_attribute("attr2", "val2")
- span2 = json.loads(s2.to_json())
- span1 = json.loads(s1.to_json())
-
- # The otel sdk, accepts an int for the start_time, and converts it
to an iso string,
- # using `util.ns_to_iso_str()`.
- nano_time = datetime_to_nano(now)
- assert span1["start_time"] == util.ns_to_iso_str(nano_time)
- # Same trace_id
- assert span1["context"]["trace_id"] == span2["context"]["trace_id"]
- assert span1["context"]["span_id"] == span2["parent_id"]
+ # mocking console exporter with in mem exporter for better assertion
+ in_mem_exporter = InMemorySpanExporter()
+ exporter.return_value = in_mem_exporter
- @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
- @patch("airflow.observability.traces.otel_tracer.conf")
- def test_context_propagation(self, conf_a, exporter):
- # necessary to speed up the span to be emitted
- with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
- log = logging.getLogger("TestOtelTrace.test_context_propagation")
- log.setLevel(logging.DEBUG)
- conf_a.get.return_value = "abc"
- conf_a.getint.return_value = 123
- # this will enable debug to set - which outputs the result to
console
- conf_a.getboolean.return_value = True
-
- # mocking console exporter with in mem exporter for better
assertion
- in_mem_exporter = InMemorySpanExporter()
- exporter.return_value = in_mem_exporter
-
- # Method that represents another service which is
- # - getting the carrier
- # - extracting the context
- # - using the context to create a new span
- # The new span should be associated with the span from the
injected context carrier.
- def _task_func(otel_tr, carrier):
- parent_context = otel_tr.extract(carrier)
-
- with otel_tr.start_child_span(span_name="sub_span",
parent_context=parent_context) as span:
- span.set_attribute("attr2", "val2")
- json_span = json.loads(span.to_json())
- return json_span
+ now = datetime.now()
- tracer = otel_tracer.get_otel_tracer(Trace)
+ tracer = otel_tracer.get_otel_tracer(Trace)
+ with tracer.start_root_span(span_name="span1", start_time=now) as s1:
+ with tracer.start_span(span_name="span2") as s2:
+ s2.set_attribute("attr2", "val2")
+ span2 = json.loads(s2.to_json())
+ span1 = json.loads(s1.to_json())
+
+ # The otel sdk, accepts an int for the start_time, and converts it to
an iso string,
+ # using `util.ns_to_iso_str()`.
+ nano_time = datetime_to_nano(now)
+ assert span1["start_time"] == util.ns_to_iso_str(nano_time)
+ # Same trace_id
+ assert span1["context"]["trace_id"] == span2["context"]["trace_id"]
+ assert span1["context"]["span_id"] == span2["parent_id"]
+
+ @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
+ @env_vars(
+ {
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318",
+ # necessary to speed up the span to be emitted
+ "OTEL_BSP_SCHEDULE_DELAY": "1",
+ }
+ )
+ def test_context_propagation(self, exporter):
+ log = logging.getLogger("TestOtelTrace.test_context_propagation")
+ log.setLevel(logging.DEBUG)
+
+ # mocking console exporter with in mem exporter for better assertion
+ in_mem_exporter = InMemorySpanExporter()
+ exporter.return_value = in_mem_exporter
+
+ # Method that represents another service which is
+ # - getting the carrier
+ # - extracting the context
+ # - using the context to create a new span
+ # The new span should be associated with the span from the injected
context carrier.
+ def _task_func(otel_tr, carrier):
+ parent_context = otel_tr.extract(carrier)
+
+ with otel_tr.start_child_span(span_name="sub_span",
parent_context=parent_context) as span:
+ span.set_attribute("attr2", "val2")
+ json_span = json.loads(span.to_json())
+ return json_span
- root_span = tracer.start_root_span(span_name="root_span",
start_as_current=False)
- # The context is available, it can be injected into the carrier.
- context_carrier = tracer.inject()
+ tracer = otel_tracer.get_otel_tracer(Trace)
- # Some function that uses the carrier to create a new span.
- json_span2 = _task_func(otel_tr=tracer, carrier=context_carrier)
+ root_span = tracer.start_root_span(span_name="root_span",
start_as_current=False)
+ # The context is available, it can be injected into the carrier.
+ context_carrier = tracer.inject()
+
+ # Some function that uses the carrier to create a new span.
+ json_span2 = _task_func(otel_tr=tracer, carrier=context_carrier)
+
+ json_span1 = json.loads(root_span.to_json())
+ # Manually end the span.
+ root_span.end()
+
+ # Verify that span1 is a root span.
+ assert json_span1["parent_id"] is None
+ # Check span2 parent_id to verify that it's a child of span1.
+ assert json_span2["parent_id"] == json_span1["context"]["span_id"]
+ # The trace_id and the span_id are randomly generated by the otel sdk.
+ # Both spans should belong to the same trace.
+ assert json_span1["context"]["trace_id"] ==
json_span2["context"]["trace_id"]
+
+ @pytest.mark.parametrize(
+ ("provided_env_vars", "expected_endpoint", "expected_exporter_module"),
+ [
+ pytest.param(
+ {
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:1234",
+ "OTEL_EXPORTER_OTLP_PROTOCOL": "grpc",
+ "AIRFLOW__TRACES__OTEL_HOST": "breeze-otel-collector",
+ "AIRFLOW__TRACES__OTEL_PORT": "4318",
+ },
+ "localhost:1234",
+ "grpc",
+ id="env_vars_with_grpc",
+ ),
+ pytest.param(
+ {
+ "OTEL_EXPORTER_OTLP_PROTOCOL": "grpc",
+ "AIRFLOW__TRACES__OTEL_HOST": "breeze-otel-collector",
+ "AIRFLOW__TRACES__OTEL_PORT": "4318",
+ },
+ "http://breeze-otel-collector:4318/v1/traces",
+ "http",
+ id="protocol_is_ignored_if_no_env_endpoint",
+ ),
+ pytest.param(
+ {
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:1234",
+ "OTEL_EXPORTER_OTLP_PROTOCOL": "http/protobuf",
+ "AIRFLOW__TRACES__OTEL_HOST": "breeze-otel-collector",
+ "AIRFLOW__TRACES__OTEL_PORT": "4318",
+ },
+ "http://localhost:1234/v1/traces",
+ "http",
+ id="for_http_with_env_vars_otel_builds_full_url",
+ ),
+ pytest.param(
+ {
+ "AIRFLOW__TRACES__OTEL_HOST": "breeze-otel-collector",
+ "AIRFLOW__TRACES__OTEL_PORT": "4318",
+ },
+ "http://breeze-otel-collector:4318/v1/traces",
+ "http",
+ id="use_airflow_config",
+ ),
+ pytest.param(
+ {
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:1234",
+ "OTEL_EXPORTER_OTLP_PROTOCOL": "http/protobuf",
+ },
+ "http://localhost:1234/v1/traces",
+ "http",
+ id="only_env_vars",
+ ),
+ pytest.param(
+ {
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:1234",
+ "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT":
"http://localhost:2222",
+ "OTEL_EXPORTER_OTLP_PROTOCOL": "http/protobuf",
+ "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL": "grpc",
+ },
+ "localhost:2222",
+ "grpc",
+ id="type_specific_vars_take_precedence",
+ ),
+ ],
+ )
+ def test_config_priorities(self, provided_env_vars, expected_endpoint,
expected_exporter_module):
+ with env_vars(provided_env_vars):
+ tracer = otel_tracer.get_otel_tracer(Trace)
- json_span1 = json.loads(root_span.to_json())
- # Manually end the span.
- root_span.end()
+ assert tracer.span_exporter._endpoint == expected_endpoint
- # Verify that span1 is a root span.
- assert json_span1["parent_id"] is None
- # Check span2 parent_id to verify that it's a child of span1.
- assert json_span2["parent_id"] == json_span1["context"]["span_id"]
- # The trace_id and the span_id are randomly generated by the otel
sdk.
- # Both spans should belong to the same trace.
- assert json_span1["context"]["trace_id"] ==
json_span2["context"]["trace_id"]
+ assert (
+ tracer.span_exporter.__class__.__module__
+ ==
f"opentelemetry.exporter.otlp.proto.{expected_exporter_module}.trace_exporter"
+ )
diff --git a/scripts/ci/docker-compose/integration-otel.yml
b/scripts/ci/docker-compose/integration-otel.yml
index d2cd759ad7f..9d5c6c8117d 100644
--- a/scripts/ci/docker-compose/integration-otel.yml
+++ b/scripts/ci/docker-compose/integration-otel.yml
@@ -68,14 +68,15 @@ services:
airflow:
environment:
- INTEGRATION_OTEL=true
+ - OTEL_SERVICE_NAME=test
+ - OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
+ - OTEL_TRACES_EXPORTER=otlp
+ - OTEL_METRICS_EXPORTER=otlp
+ -
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://breeze-otel-collector:4318/v1/traces
+ -
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT=http://breeze-otel-collector:4318/v1/metrics
+ - OTEL_METRIC_EXPORT_INTERVAL=30000
- AIRFLOW__METRICS__OTEL_ON=True
- - AIRFLOW__METRICS__OTEL_HOST=breeze-otel-collector
- - AIRFLOW__METRICS__OTEL_PORT=4318
- - AIRFLOW__METRICS__OTEL_INTERVAL_MILLISECONDS=30000
- AIRFLOW__TRACES__OTEL_ON=True
- - AIRFLOW__TRACES__OTEL_HOST=breeze-otel-collector
- - AIRFLOW__TRACES__OTEL_PORT=4318
- - AIRFLOW__TRACES__OTEL_DEBUGGING_ON=False
- AIRFLOW__TRACES__OTEL_TASK_LOG_EVENT=True
depends_on:
diff --git a/shared/observability/src/airflow_shared/observability/common.py
b/shared/observability/src/airflow_shared/observability/common.py
new file mode 100644
index 00000000000..a92a26543d3
--- /dev/null
+++ b/shared/observability/src/airflow_shared/observability/common.py
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import structlog
+
+from .otel_env_config import OtelDataType, OtelEnvConfig
+
+if TYPE_CHECKING:
+ from opentelemetry.sdk.metrics._internal.export import MetricExporter
+ from opentelemetry.sdk.trace.export import SpanExporter
+
+log = structlog.getLogger(__name__)
+
+
+def get_otel_data_exporter(
+ *,
+ otel_env_config: OtelEnvConfig,
+ host: str | None = None,
+ port: int | None = None,
+ ssl_active: bool = False,
+) -> SpanExporter | MetricExporter:
+ protocol = "https" if ssl_active else "http"
+
+ # According to the OpenTelemetry Spec, specific config options like
'OTEL_EXPORTER_OTLP_TRACES_ENDPOINT'
+ # take precedence over generic ones like 'OTEL_EXPORTER_OTLP_ENDPOINT'.
+ env_exporter_protocol = (
+ otel_env_config.type_specific_exporter_protocol or
otel_env_config.exporter_protocol
+ )
+ env_endpoint = otel_env_config.type_specific_endpoint or
otel_env_config.base_endpoint
+
+ # If the protocol env var isn't set, then it will be None,
+ # and it will default to an http/protobuf exporter.
+ if env_endpoint and env_exporter_protocol == "grpc":
+ from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import
OTLPMetricExporter
+ from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import
OTLPSpanExporter
+ else:
+ from opentelemetry.exporter.otlp.proto.http.metric_exporter import
OTLPMetricExporter
+ from opentelemetry.exporter.otlp.proto.http.trace_exporter import
OTLPSpanExporter
+
+ if env_endpoint:
+ if host is not None and port is not None:
+ log.warning(
+ "Both the standard OpenTelemetry environment variables and "
+ "the Airflow OpenTelemetry configs have been provided. "
+ "Using the OpenTelemetry environment variables. "
+ "The Airflow configs have been deprecated and will be removed
in the future."
+ )
+
+ endpoint_str = env_endpoint
+ # The SDK will pick up all the values from the environment.
+ if otel_env_config.data_type == OtelDataType.TRACES:
+ exporter = OTLPSpanExporter()
+ else:
+ exporter = OTLPMetricExporter()
+ else:
+ if host is None or port is None:
+ # Since the configs have been deprecated, host and port could be
None.
+ # Log a warning to steer the user towards configuring the
environment variables
+ # and deliberately let it fail here without providing fallbacks.
+ log.warning(
+ "OpenTelemetry %s have been enabled but the endpoint settings
haven't been configured. "
+ "The Airflow configs have been deprecated and will be removed
in the future. "
+ "Configure the standard OpenTelemetry environment variables
instead. "
+ "For more info, check the docs.",
+ otel_env_config.data_type.value,
+ )
+ else:
+ log.warning(
+ "The Airflow OpenTelemetry configs have been deprecated and
will be removed in the future. "
+ "OpenTelemetry is advised to be configured using the standard
environment variables. "
+ "For more info, check the docs."
+ )
+ # If the environment endpoint isn't set, then assume that the airflow
config is used
+ # where protocol isn't specified, and it's always http/protobuf.
+ # In that case it should default to the full 'url_path' and set it
directly.
+
+ endpoint_suffix = "traces" if otel_env_config.data_type ==
OtelDataType.TRACES else "metrics"
+
+ endpoint_str = f"{protocol}://{host}:{port}/v1/{endpoint_suffix}"
+ if otel_env_config.data_type == OtelDataType.TRACES:
+ exporter = OTLPSpanExporter(endpoint=endpoint_str)
+ else:
+ exporter = OTLPMetricExporter(endpoint=endpoint_str)
+
+ exporter_name = (
+ "OTLPSpanExporter" if otel_env_config.data_type == OtelDataType.TRACES
else "OTLPMetricExporter"
+ )
+
+ log.info("[%s] Connecting to OpenTelemetry Collector at %s",
exporter_name, endpoint_str)
+
+ return exporter
diff --git
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
index 2db61862db9..4b5174a3bf3 100644
---
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
+++
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
@@ -19,18 +19,21 @@ from __future__ import annotations
import atexit
import datetime
import logging
-import os
import random
import warnings
from collections.abc import Callable
from typing import TYPE_CHECKING
from opentelemetry import metrics
-from opentelemetry.exporter.otlp.proto.http.metric_exporter import
OTLPMetricExporter
from opentelemetry.sdk.metrics import MeterProvider
-from opentelemetry.sdk.metrics._internal.export import ConsoleMetricExporter,
PeriodicExportingMetricReader
+from opentelemetry.sdk.metrics._internal.export import (
+ ConsoleMetricExporter,
+ PeriodicExportingMetricReader,
+)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
+from ..common import get_otel_data_exporter
+from ..otel_env_config import load_metrics_env_config
from .protocols import Timer
from .validators import (
OTEL_NAME_MAX_LENGTH,
@@ -397,28 +400,32 @@ def get_otel_logger(
stat_name_handler: Callable[[str], str] | None = None,
statsd_influxdb_enabled: bool = False,
) -> SafeOtelLogger:
- effective_service_name: str = service_name or "airflow"
+ otel_env_config = load_metrics_env_config()
+
+ effective_service_name: str = otel_env_config.service_name or service_name
or "airflow"
effective_prefix: str = prefix or DEFAULT_METRIC_NAME_PREFIX
resource = Resource.create(attributes={SERVICE_NAME:
effective_service_name})
- protocol = "https" if ssl_active else "http"
- # Allow transparent support for standard OpenTelemetry SDK environment
variables.
- #
https://opentelemetry.io/docs/specs/otel/protocol/exporter/#configuration-options
- endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT",
f"{protocol}://{host}:{port}")
- metrics_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT",
f"{endpoint}/v1/metrics")
+
#
https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#periodic-exporting-metricreader
- if interval := os.environ.get("OTEL_METRIC_EXPORT_INTERVAL",
conf_interval):
- interval = float(interval)
- else:
- # If the env variable is an empty string.
- interval = None
- log.info("[Metric Exporter] Connecting to OpenTelemetry Collector at %s",
endpoint)
+ interval = otel_env_config.interval_ms or conf_interval
+
+ metric_exporter = get_otel_data_exporter(
+ otel_env_config=otel_env_config,
+ host=host,
+ port=port,
+ ssl_active=ssl_active,
+ )
+
readers = [
PeriodicExportingMetricReader(
- OTLPMetricExporter(endpoint=metrics_endpoint),
+ exporter=metric_exporter,
export_interval_millis=interval, # type: ignore[arg-type]
)
]
+ if otel_env_config.exporter:
+ debug = otel_env_config.exporter == "console"
+
if debug:
export_to_console = PeriodicExportingMetricReader(
ConsoleMetricExporter(),
diff --git
a/shared/observability/src/airflow_shared/observability/otel_env_config.py
b/shared/observability/src/airflow_shared/observability/otel_env_config.py
new file mode 100644
index 00000000000..61f2cf2e1f7
--- /dev/null
+++ b/shared/observability/src/airflow_shared/observability/otel_env_config.py
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+from dataclasses import dataclass
+from enum import Enum
+
+import structlog
+
+log = structlog.getLogger(__name__)
+
+
+def _parse_kv_str_to_dict(str_var: str | None) -> dict[str, str]:
+ """
+ Convert a string of key-value pairs to a dictionary.
+
+ Environment variables like 'OTEL_RESOURCE_ATTRIBUTES' or
'OTEL_EXPORTER_OTLP_HEADERS'
+ accept values with the format "key1=value1,key2=value2,..."
+ """
+ configs = {}
+ if str_var:
+ for pair in str_var.split(","):
+ if "=" in pair:
+ k, v = pair.split("=", 1)
+ configs[k.strip()] = v.strip()
+ return configs
+
+
+class OtelDataType(str, Enum):
+ """Enum with the different telemetry data types."""
+
+ TRACES = "traces"
+ METRICS = "metrics"
+
+
+@dataclass(frozen=True)
+class OtelEnvConfig:
+ """Immutable class for holding OTel config environment variables."""
+
+ data_type: OtelDataType # traces | metrics
+ base_endpoint: str | None # base url
+ type_specific_endpoint: str | None # traces | metrics specific url
+ exporter_protocol: str | None # "grpc" | "http/protobuf"
+ type_specific_exporter_protocol: str | None # traces | metrics specific
protocol
+ exporter: str | None # OTEL_TRACES_EXPORTER | OTEL_METRICS_EXPORTER
+ service_name: str | None
+ headers_kv_str: str | None
+ headers: dict[str, str]
+ resource_attributes_kv_str: str | None
+ resource_attributes: dict[str, str]
+ interval_ms: float | None
+
+
+def load_otel_env_config(data_type: OtelDataType) -> OtelEnvConfig:
+ """Read OTel config env vars and return an OtelEnvConfig object."""
+ exporter_protocol = os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL")
+ service_name = os.getenv("OTEL_SERVICE_NAME")
+ headers_kv_str = os.getenv("OTEL_EXPORTER_OTLP_HEADERS")
+ resource_attributes_kv_str = os.getenv("OTEL_RESOURCE_ATTRIBUTES")
+ base_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
+
+ if data_type == OtelDataType.TRACES:
+ type_specific_endpoint =
os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT")
+ type_specific_exporter_protocol =
os.getenv("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL")
+ exporter = os.getenv("OTEL_TRACES_EXPORTER")
+ interval_ms = None
+ else:
+ type_specific_endpoint =
os.getenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT")
+ type_specific_exporter_protocol =
os.getenv("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL")
+ exporter = os.getenv("OTEL_METRICS_EXPORTER")
+ # Instead of directly providing a default value of float,
+ # use a value of str and convert to float to get rid of a static-code
check error.
+ interval = os.getenv("OTEL_METRIC_EXPORT_INTERVAL")
+ interval_ms = float(interval) if interval else None
+
+ return OtelEnvConfig(
+ data_type=data_type,
+ base_endpoint=base_endpoint,
+ type_specific_endpoint=type_specific_endpoint,
+ exporter_protocol=exporter_protocol,
+ type_specific_exporter_protocol=type_specific_exporter_protocol,
+ exporter=exporter,
+ service_name=service_name,
+ headers_kv_str=headers_kv_str,
+ headers=_parse_kv_str_to_dict(headers_kv_str),
+ resource_attributes_kv_str=resource_attributes_kv_str,
+ resource_attributes=_parse_kv_str_to_dict(resource_attributes_kv_str),
+ interval_ms=interval_ms,
+ )
+
+
+def load_traces_env_config() -> OtelEnvConfig:
+ return load_otel_env_config(OtelDataType.TRACES)
+
+
+def load_metrics_env_config() -> OtelEnvConfig:
+ return load_otel_env_config(OtelDataType.METRICS)
diff --git
a/shared/observability/src/airflow_shared/observability/traces/otel_tracer.py
b/shared/observability/src/airflow_shared/observability/traces/otel_tracer.py
index b8fb3f9c9c4..1e9533fdbe8 100644
---
a/shared/observability/src/airflow_shared/observability/traces/otel_tracer.py
+++
b/shared/observability/src/airflow_shared/observability/traces/otel_tracer.py
@@ -18,7 +18,6 @@
from __future__ import annotations
import logging
-import os
import random
from contextlib import AbstractContextManager
from typing import TYPE_CHECKING
@@ -26,15 +25,21 @@ from typing import TYPE_CHECKING
import pendulum
from opentelemetry import trace
from opentelemetry.context import attach, create_key
-from opentelemetry.exporter.otlp.proto.http.trace_exporter import
OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import Span, SpanProcessor, Tracer as
OpenTelemetryTracer, TracerProvider
-from opentelemetry.sdk.trace.export import BatchSpanProcessor,
ConsoleSpanExporter, SimpleSpanProcessor
+from opentelemetry.sdk.trace.export import (
+ BatchSpanProcessor,
+ ConsoleSpanExporter,
+ SimpleSpanProcessor,
+ SpanExporter,
+)
from opentelemetry.sdk.trace.id_generator import IdGenerator
from opentelemetry.trace import Link, NonRecordingSpan, SpanContext,
TraceFlags, Tracer
from opentelemetry.trace.propagation.tracecontext import
TraceContextTextMapPropagator
from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
+from ..common import get_otel_data_exporter
+from ..otel_env_config import load_traces_env_config
from .utils import (
datetime_to_nano,
parse_traceparent,
@@ -58,7 +63,7 @@ class OtelTrace:
def __init__(
self,
- span_exporter: OTLPSpanExporter,
+ span_exporter: SpanExporter,
use_simple_processor: bool,
tag_string: str | None = None,
otel_service: str | None = None,
@@ -142,9 +147,9 @@ class OtelTrace:
links=None,
start_time=None,
):
- """Start a span; if service_name is not given, otel_service is used."""
- if component is None:
- component = self.otel_service
+ """Start a span."""
+ # Common practice is to use the module name.
+ component = component or __name__
trace_id = self.get_current_span().get_span_context().trace_id
tracer = self.get_tracer(component=component, trace_id=trace_id,
span_id=span_id)
@@ -253,8 +258,8 @@ class OtelTrace:
start_time=None,
start_as_current: bool = True,
) -> AbstractContextManager[trace.span.Span] | trace.span.Span:
- if component is None:
- component = self.otel_service
+ # Common practice is to use the module name.
+ component = component or __name__
tracer = self.get_tracer(component=component)
@@ -339,18 +344,26 @@ def get_otel_tracer(
otel_service: str | None = None,
debug: bool = False,
) -> OtelTrace:
- """Get OTEL tracer from airflow configuration."""
+ """Get OTEL tracer from the regular OTel env variables or the airflow
configuration."""
+ otel_env_config = load_traces_env_config()
+
tag_string = cls.get_constant_tags()
- protocol = "https" if ssl_active else "http"
- # Allow transparent support for standard OpenTelemetry SDK environment
variables.
- #
https://opentelemetry.io/docs/specs/otel/protocol/exporter/#configuration-options
- endpoint = os.environ.get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT",
f"{protocol}://{host}:{port}/v1/traces")
+ exporter = get_otel_data_exporter(
+ otel_env_config=otel_env_config,
+ host=host,
+ port=port,
+ ssl_active=ssl_active,
+ )
+
+ otel_service = otel_env_config.service_name or otel_service
+
+ if otel_env_config.exporter:
+ debug = otel_env_config.exporter == "console"
- log.info("[OTLPSpanExporter] Connecting to OpenTelemetry Collector at %s",
endpoint)
log.info("Should use simple processor: %s", use_simple_processor)
return OtelTrace(
- span_exporter=OTLPSpanExporter(endpoint=endpoint),
+ span_exporter=exporter,
use_simple_processor=use_simple_processor,
tag_string=tag_string,
otel_service=otel_service,
diff --git
a/shared/observability/tests/observability/metrics/test_otel_logger.py
b/shared/observability/tests/observability/metrics/test_otel_logger.py
index e09ad9a1f4f..c27c3729969 100644
--- a/shared/observability/tests/observability/metrics/test_otel_logger.py
+++ b/shared/observability/tests/observability/metrics/test_otel_logger.py
@@ -26,6 +26,7 @@ from unittest import mock
import pytest
from opentelemetry.metrics import MeterProvider
+from airflow_shared.observability.common import get_otel_data_exporter
from airflow_shared.observability.exceptions import InvalidStatsNameException
from airflow_shared.observability.metrics.otel_logger import (
OTEL_NAME_MAX_LENGTH,
@@ -41,6 +42,9 @@ from airflow_shared.observability.metrics.validators import (
BACK_COMPAT_METRIC_NAMES,
MetricNameLengthExemptionWarning,
)
+from airflow_shared.observability.otel_env_config import
load_metrics_env_config
+
+from tests_common.test_utils.config import env_vars
INVALID_STAT_NAME_CASES = [
(None, "can not be None"),
@@ -312,6 +316,105 @@ class TestOtelMetrics:
assert mock_time.call_count == 2
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
+ @pytest.mark.parametrize(
+ (
+ "provided_env_vars",
+ "airflow_conf_host",
+ "airflow_conf_port",
+ "expected_endpoint",
+ "expected_exporter_module",
+ ),
+ [
+ pytest.param(
+ {
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:1234",
+ "OTEL_EXPORTER_OTLP_PROTOCOL": "grpc",
+ },
+ "breeze-otel-collector",
+ "4318",
+ "localhost:1234",
+ "grpc",
+ id="env_vars_with_grpc",
+ ),
+ pytest.param(
+ {
+ "OTEL_EXPORTER_OTLP_PROTOCOL": "grpc",
+ },
+ "breeze-otel-collector",
+ "4318",
+ "http://breeze-otel-collector:4318/v1/metrics",
+ "http",
+ id="protocol_is_ignored_if_no_env_endpoint",
+ ),
+ pytest.param(
+ {
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:1234",
+ "OTEL_EXPORTER_OTLP_PROTOCOL": "http/protobuf",
+ },
+ "breeze-otel-collector",
+ "4318",
+ "http://localhost:1234/v1/metrics",
+ "http",
+ id="for_http_with_env_vars_otel_builds_full_url",
+ ),
+ pytest.param(
+ {},
+ "breeze-otel-collector",
+ "4318",
+ "http://breeze-otel-collector:4318/v1/metrics",
+ "http",
+ id="use_airflow_config",
+ ),
+ pytest.param(
+ {
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:1234",
+ "OTEL_EXPORTER_OTLP_PROTOCOL": "http/protobuf",
+ },
+ None,
+ None,
+ "http://localhost:1234/v1/metrics",
+ "http",
+ id="only_env_vars",
+ ),
+ pytest.param(
+ {
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:1234",
+ "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT":
"http://localhost:2222",
+ "OTEL_EXPORTER_OTLP_PROTOCOL": "http/protobuf",
+ "OTEL_EXPORTER_OTLP_METRICS_PROTOCOL": "grpc",
+ },
+ None,
+ None,
+ "localhost:2222",
+ "grpc",
+ id="type_specific_vars_take_precedence",
+ ),
+ ],
+ )
+ def test_config_priorities(
+ self,
+ provided_env_vars,
+ airflow_conf_host,
+ airflow_conf_port,
+ expected_endpoint,
+ expected_exporter_module,
+ ):
+ with env_vars(provided_env_vars):
+ otel_env_config = load_metrics_env_config()
+
+ otel_metric_exporter = get_otel_data_exporter(
+ otel_env_config=otel_env_config,
+ host=airflow_conf_host,
+ port=airflow_conf_port,
+ )
+
+ assert otel_metric_exporter._endpoint == expected_endpoint
+
+ assert (
+ otel_metric_exporter.__class__.__module__
+ ==
f"opentelemetry.exporter.otlp.proto.{expected_exporter_module}.metric_exporter"
+ )
+
def test_atexit_flush_on_process_exit(self):
"""
Run a process that initializes a logger, creates a stat and then exits.
diff --git a/task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py
b/task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py
index 2e3de7abd96..aa3589ec9d5 100644
--- a/task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py
+++ b/task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py
@@ -26,17 +26,28 @@ if TYPE_CHECKING:
def get_otel_logger(cls) -> SafeOtelLogger:
+ # The config values have been deprecated and therefore,
+ # if the user hasn't added them to the config, the default values won't be
used.
+ # A fallback is needed to avoid an exception.
+ port = None
+ if conf.has_option("metrics", "otel_port"):
+ port = conf.getint("metrics", "otel_port")
+
+ conf_interval = None
+ if conf.has_option("metrics", "otel_interval_milliseconds"):
+ conf_interval = conf.getfloat("metrics", "otel_interval_milliseconds")
+
return otel_logger.get_otel_logger(
- host=conf.get("metrics", "otel_host"), # ex: "breeze-otel-collector"
- port=conf.getint("metrics", "otel_port"), # ex: 4318
- prefix=conf.get("metrics", "otel_prefix"), # ex: "airflow"
- ssl_active=conf.getboolean("metrics", "otel_ssl_active"),
+ host=conf.get("metrics", "otel_host", fallback=None), # ex:
"breeze-otel-collector"
+ port=port, # ex: 4318
+ prefix=conf.get("metrics", "otel_prefix", fallback=None), # ex:
"airflow"
+ ssl_active=conf.getboolean("metrics", "otel_ssl_active",
fallback=False),
# PeriodicExportingMetricReader will default to an interval of 60000
millis.
- conf_interval=conf.getfloat("metrics", "otel_interval_milliseconds",
fallback=None), # ex: 30000
- debug=conf.getboolean("metrics", "otel_debugging_on"),
- service_name=conf.get("metrics", "otel_service"),
+ conf_interval=conf_interval, # ex: 30000
+ debug=conf.getboolean("metrics", "otel_debugging_on", fallback=False),
+ service_name=conf.get("metrics", "otel_service", fallback=None),
metrics_allow_list=conf.get("metrics", "metrics_allow_list",
fallback=None),
metrics_block_list=conf.get("metrics", "metrics_block_list",
fallback=None),
- stat_name_handler=conf.getimport("metrics", "stat_name_handler"),
+ stat_name_handler=conf.getimport("metrics", "stat_name_handler",
fallback=None),
statsd_influxdb_enabled=conf.getboolean("metrics",
"statsd_influxdb_enabled", fallback=False),
)
diff --git a/task-sdk/src/airflow/sdk/observability/traces/otel_tracer.py
b/task-sdk/src/airflow/sdk/observability/traces/otel_tracer.py
index ef5a3542c62..2828fce21ca 100644
--- a/task-sdk/src/airflow/sdk/observability/traces/otel_tracer.py
+++ b/task-sdk/src/airflow/sdk/observability/traces/otel_tracer.py
@@ -27,14 +27,21 @@ if TYPE_CHECKING:
def get_otel_tracer(cls, use_simple_processor: bool = False) -> OtelTrace:
+ # The config values have been deprecated and therefore,
+ # if the user hasn't added them to the config, the default values won't be
used.
+ # A fallback is needed to avoid an exception.
+ port = None
+ if conf.has_option("traces", "otel_port"):
+ port = conf.getint("traces", "otel_port")
+
return otel_tracer.get_otel_tracer(
cls,
use_simple_processor,
- host=conf.get("traces", "otel_host"),
- port=conf.getint("traces", "otel_port"),
- ssl_active=conf.getboolean("traces", "otel_ssl_active"),
- otel_service=conf.get("traces", "otel_service"),
- debug=conf.getboolean("traces", "otel_debugging_on"),
+ host=conf.get("traces", "otel_host", fallback=None),
+ port=port,
+ ssl_active=conf.getboolean("traces", "otel_ssl_active",
fallback=False),
+ otel_service=conf.get("traces", "otel_service", fallback=None),
+ debug=conf.getboolean("traces", "otel_debugging_on", fallback=False),
)