This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new bb003741ee7 [v3-1-test] Flush in-memory OTel metrics at process 
shutdown (#61808) (#61869)
bb003741ee7 is described below

commit bb003741ee7bef6c903909bffb0b88e06b9ff59b
Author: Christos Bisias <[email protected]>
AuthorDate: Tue Feb 17 03:05:55 2026 +0200

    [v3-1-test] Flush in-memory OTel metrics at process shutdown (#61808) 
(#61869)
    
    * manually backport and resolve conflicts
    
    * fix mypy-airflow-core error
    
    * make package unit.core importable for the test subprocess
---
 airflow-core/src/airflow/metrics/otel_logger.py    |  13 +++
 airflow-core/src/airflow/stats.py                  |  29 ++++++
 airflow-core/tests/integration/otel/test_otel.py   | 104 +++++++++++++++++++++
 airflow-core/tests/unit/core/test_otel_logger.py   |  55 +++++++++++
 .../src/tests_common/test_utils/otel_utils.py      |  68 ++++++++++----
 5 files changed, 252 insertions(+), 17 deletions(-)

diff --git a/airflow-core/src/airflow/metrics/otel_logger.py 
b/airflow-core/src/airflow/metrics/otel_logger.py
index 317b70a5ca2..6b75428529c 100644
--- a/airflow-core/src/airflow/metrics/otel_logger.py
+++ b/airflow-core/src/airflow/metrics/otel_logger.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import atexit
 import datetime
 import logging
 import random
@@ -370,6 +371,15 @@ class MetricsMap:
         self.map[key].set_value(value, delta)
 
 
+def flush_otel_metrics():
+    provider = metrics.get_meter_provider()
+    provider.force_flush()
+
+
+def atexit_register_metrics_flush():
+    atexit.register(flush_otel_metrics)
+
+
 def get_otel_logger(cls) -> SafeOtelLogger:
     host = conf.get("metrics", "otel_host")  # ex: "breeze-otel-collector"
     port = conf.getint("metrics", "otel_port")  # ex: 4318
@@ -405,4 +415,7 @@ def get_otel_logger(cls) -> SafeOtelLogger:
         ),
     )
 
+    # Register a hook that flushes any in-memory metrics at shutdown.
+    atexit_register_metrics_flush()
+
     return SafeOtelLogger(metrics.get_meter_provider(), prefix, 
get_validator())
diff --git a/airflow-core/src/airflow/stats.py 
b/airflow-core/src/airflow/stats.py
index 6cb9229ab73..50d46231fd2 100644
--- a/airflow-core/src/airflow/stats.py
+++ b/airflow-core/src/airflow/stats.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import logging
+import os
 import socket
 from collections.abc import Callable
 from typing import TYPE_CHECKING
@@ -34,14 +35,42 @@ log = logging.getLogger(__name__)
 class _Stats(type):
     factory: Callable
     instance: StatsLogger | NoStatsLogger | None = None
+    _instance_pid: int | None = None
 
     def __getattr__(cls, name: str) -> str:
+        # When using OpenTelemetry, some subprocesses are short-lived and
+        # often exit before flushing any metrics.
+        #
+        # The solution is to register a hook that performs a force flush at 
exit.
+        # The atexit hook is registered when initializing the instance.
+        #
+        # The instance gets initialized once per process. In case a process is 
forked, then
+        # the new subprocess, will inherit the already initialized instance of 
the parent process.
+        #
+        # Store the instance pid so that it can be compared with the current 
pid
+        # to decide whether to initialize the instance again or not.
+        #
+        # So far, all forks are resetting their state to remove anything 
inherited by the parent.
+        # But in the future that might not always be true.
+        current_pid = os.getpid()
+        if cls.instance and cls._instance_pid != current_pid:
+            log.info(
+                "Stats instance was created in PID %s but accessed in PID %s. 
Re-initializing.",
+                cls._instance_pid,
+                current_pid,
+            )
+            # Setting the instance to None, will force re-initialization.
+            cls.instance = None
+            cls._instance_pid = None
+
         if not cls.instance:
             try:
                 cls.instance = cls.factory()
+                cls._instance_pid = current_pid
             except (socket.gaierror, ImportError) as e:
                 log.error("Could not configure StatsClient: %s, using 
NoStatsLogger instead.", e)
                 cls.instance = NoStatsLogger()
+                cls._instance_pid = current_pid
         return getattr(cls.instance, name)
 
     def __init__(cls, *args, **kwargs) -> None:
diff --git a/airflow-core/tests/integration/otel/test_otel.py 
b/airflow-core/tests/integration/otel/test_otel.py
index 51f70c51291..944f350c6e1 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -45,6 +45,7 @@ from tests_common.test_utils.otel_utils import (
     assert_span_name_belongs_to_root_span,
     assert_span_not_in_children_spans,
     dump_airflow_metadata_db,
+    extract_metrics_from_output,
     extract_spans_from_output,
     get_parent_child_dict,
 )
@@ -193,6 +194,15 @@ def check_ti_state_and_span_status(task_id: str, run_id: 
str, state: str, span_s
             )
 
 
+def check_metrics_exist(output: str, metrics_to_check: list[str]):
+    # Get a list of lines from the captured output.
+    output_lines = output.splitlines()
+
+    metrics_dict = extract_metrics_from_output(output_lines)
+
+    assert set(metrics_to_check).issubset(metrics_dict.keys())
+
+
 def check_spans_with_continuance(output: str, dag: DAG, continuance_for_t1: 
bool = True):
     # Get a list of lines from the captured output.
     output_lines = output.splitlines()
@@ -769,6 +779,100 @@ class TestOtelIntegration:
         except Exception as ex:
             log.error("Could not delete leftover control file '%s', error: 
'%s'.", self.control_file, ex)
 
+    def dag_execution_for_testing_metrics(self, capfd):
+        # Metrics.
+        os.environ["AIRFLOW__METRICS__OTEL_ON"] = "True"
+        os.environ["AIRFLOW__METRICS__OTEL_HOST"] = "breeze-otel-collector"
+        os.environ["AIRFLOW__METRICS__OTEL_PORT"] = "4318"
+        os.environ["AIRFLOW__METRICS__OTEL_INTERVAL_MILLISECONDS"] = "5000"
+
+        if self.use_otel != "true":
+            os.environ["AIRFLOW__METRICS__OTEL_DEBUGGING_ON"] = "True"
+
+        celery_worker_process = None
+        scheduler_process = None
+        apiserver_process = None
+        try:
+            # Start the processes here and not as fixtures or in a common 
setup,
+            # so that the test can capture their output.
+            celery_worker_process, scheduler_process, apiserver_process = 
self.start_worker_and_scheduler1()
+
+            dag_id = "otel_test_dag"
+
+            assert len(self.dags) > 0
+            dag = self.dags[dag_id]
+
+            assert dag is not None
+
+            run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
+
+            # Skip the span_status check.
+            wait_for_dag_run_and_check_span_status(
+                dag_id=dag_id, run_id=run_id, max_wait_time=90, 
span_status=None
+            )
+
+            # The ti span_status is updated while processing the executor 
events,
+            # which is after the dag_run state has been updated.
+            time.sleep(10)
+
+            task_dict = dag.task_dict
+            task_dict_ids = task_dict.keys()
+
+            for task_id in task_dict_ids:
+                # Skip the span_status check.
+                check_ti_state_and_span_status(
+                    task_id=task_id, run_id=run_id, state=State.SUCCESS, 
span_status=None
+                )
+
+            print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
+        finally:
+            # Terminate the processes.
+            celery_worker_process.terminate()
+            celery_worker_process.wait()
+
+            celery_status = celery_worker_process.poll()
+            assert celery_status is not None, (
+                "The celery worker process status is None, which means that it 
hasn't terminated as expected."
+            )
+
+            scheduler_process.terminate()
+            scheduler_process.wait()
+
+            scheduler_status = scheduler_process.poll()
+            assert scheduler_status is not None, (
+                "The scheduler_1 process status is None, which means that it 
hasn't terminated as expected."
+            )
+
+            apiserver_process.terminate()
+            apiserver_process.wait()
+
+            apiserver_status = apiserver_process.poll()
+            assert apiserver_status is not None, (
+                "The apiserver process status is None, which means that it 
hasn't terminated as expected."
+            )
+
+        out, err = capfd.readouterr()
+        log.info("out-start --\n%s\n-- out-end", out)
+        log.info("err-start --\n%s\n-- err-end", err)
+
+        return out, dag
+
+    def test_export_metrics_during_process_shutdown(
+        self, monkeypatch, celery_worker_env_vars, capfd, session
+    ):
+        out, dag = self.dag_execution_for_testing_metrics(capfd)
+
+        if self.use_otel != "true":
+            # Test the metrics from the output.
+            metrics_to_check = [
+                "airflow.ti_successes",
+                "airflow.operator_successes",
+                "airflow.executor.running_tasks",
+                "airflow.executor.queued_tasks",
+                "airflow.executor.open_slots",
+            ]
+            check_metrics_exist(output=out, metrics_to_check=metrics_to_check)
+
     @pytest.mark.execution_timeout(90)
     def test_dag_execution_succeeds(self, monkeypatch, celery_worker_env_vars, 
capfd, session):
         """The same scheduler will start and finish the dag processing."""
diff --git a/airflow-core/tests/unit/core/test_otel_logger.py 
b/airflow-core/tests/unit/core/test_otel_logger.py
index 43d30ff9d07..db08fbd204e 100644
--- a/airflow-core/tests/unit/core/test_otel_logger.py
+++ b/airflow-core/tests/unit/core/test_otel_logger.py
@@ -17,6 +17,10 @@
 from __future__ import annotations
 
 import logging
+import os
+import pathlib
+import subprocess
+import sys
 import time
 from unittest import mock
 
@@ -32,8 +36,12 @@ from airflow.metrics.otel_logger import (
     _generate_key_name,
     _is_up_down_counter,
     full_name,
+    get_otel_logger,
 )
 from airflow.metrics.validators import BACK_COMPAT_METRIC_NAMES, 
MetricNameLengthExemptionWarning
+from airflow.stats import Stats
+
+from tests_common.test_utils.config import conf_vars
 
 INVALID_STAT_NAME_CASES = [
     (None, "can not be None"),
@@ -302,3 +310,50 @@ class TestOtelMetrics:
         assert timer.duration == expected_value
         assert mock_time.call_count == 2
         
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
+
+    def test_atexit_flush_on_process_exit(self):
+        """
+        Run a process that initializes a logger, creates a stat and then exits.
+
+        The logger initialization registers an atexit hook.
+        Test that the hook runs and flushes the created stat at shutdown.
+        """
+        test_module_name = "unit.core.test_otel_logger"
+        function_call_str = f"import {test_module_name} as m; 
m.mock_service_run()"
+
+        # pytest adds 'airflow-core/tests' to the path and makes the package 
'unit.core' importable.
+        # The subprocess doesn't inherit it, and in order to make the package 
importable,
+        # the current tests directory, needs to be injected into the 
'PYTHONPATH'.
+        #
+        # Get 'airflow-core/tests' and add it to the env copy that is passed 
to the subprocess.
+        tests_dir = str(pathlib.Path(__file__).resolve().parents[2])
+        current_env = os.environ.copy()
+        current_env["PYTHONPATH"] = tests_dir + os.pathsep + 
current_env.get("PYTHONPATH", "")
+
+        proc = subprocess.run(
+            [sys.executable, "-c", function_call_str],
+            check=False,
+            env=current_env,
+            capture_output=True,
+            text=True,
+            timeout=20,
+        )
+
+        assert proc.returncode == 0, f"Process 
failed\nstdout:\n{proc.stdout}\nstderr:\n{proc.stderr}"
+
+        assert "my_test_stat" in proc.stdout, (
+            "Expected the metric name to be present in the stdout but it 
wasn't.\n"
+            f"stdout:\n{proc.stdout}\n"
+            f"stderr:\n{proc.stderr}"
+        )
+
+
+def mock_service_run():
+    with conf_vars(
+        {
+            ("metrics", "otel_on"): "True",
+            ("metrics", "otel_debugging_on"): "True",
+        }
+    ):
+        logger = get_otel_logger(Stats)
+        logger.incr("my_test_stat")
diff --git a/devel-common/src/tests_common/test_utils/otel_utils.py 
b/devel-common/src/tests_common/test_utils/otel_utils.py
index 3e8e99ac5bf..a2f53b47381 100644
--- a/devel-common/src/tests_common/test_utils/otel_utils.py
+++ b/devel-common/src/tests_common/test_utils/otel_utils.py
@@ -19,6 +19,8 @@ from __future__ import annotations
 import json
 import logging
 import pprint
+from collections import defaultdict
+from typing import Literal
 
 from sqlalchemy import inspect
 
@@ -94,14 +96,21 @@ def clean_task_lines(lines: list) -> list:
     return cleaned_lines
 
 
-def extract_spans_from_output(output_lines: list):
+def _extract_obj_from_output(output_lines: list[str], kind: Literal["spans"] | 
Literal["metrics"]):
     """
-    For a given list of ConsoleSpanExporter output lines, it extracts the json 
spans and creates two dictionaries.
+    Used to extract spans or metrics from the output.
 
-    :return: root spans dict (key: root_span_id - value: root_span), spans 
dict (key: span_id - value: span)
+    Parameters
+    ----------
+    :param output_lines: The captured stdout split into lines.
+    :param kind: Which json type to extract from the output.
     """
+    assert kind in ("spans", "metrics")
+
     span_dict = {}
     root_span_dict = {}
+    metric_dict: dict[str, list[dict]] = defaultdict(list)
+
     total_lines = len(output_lines)
     index = 0
     output_lines = clean_task_lines(output_lines)
@@ -133,23 +142,48 @@ def extract_spans_from_output(output_lines: list):
             # Create a formatted json string and then convert the string to a 
python dict.
             json_str = "\n".join(json_lines)
             try:
-                span = json.loads(json_str)
-                span_id = span["context"]["span_id"]
-                span_dict[span_id] = span
-
-                if span["parent_id"] is None:
-                    # This is a root span, add it to the root_span_map as well.
-                    root_span_id = span["context"]["span_id"]
-                    root_span_dict[root_span_id] = span
-
-            except json.JSONDecodeError as e:
-                log.error("Failed to parse JSON span: %s", e)
-                log.error("Failed JSON string:")
-                log.error(json_str)
+                obj = json.loads(json_str)
+            except json.JSONDecodeError:
+                log.error("Failed to parse JSON: %s", json_str)
+                index += 1
+                continue
+
+            if kind == "spans":
+                if "context" not in obj or "resource_metrics" in obj:
+                    index += 1
+                    continue
+                span_id = obj["context"]["span_id"]
+                span_dict[span_id] = obj
+                if obj["parent_id"] is None:
+                    root_span_dict[span_id] = obj
+            else:  # kind == "metrics"
+                if "resource_metrics" not in obj:
+                    index += 1
+                    continue
+                for res in obj["resource_metrics"]:
+                    for scope in res.get("scope_metrics", []):
+                        for metric in scope.get("metrics", []):
+                            metric_dict[metric["name"]].append(metric)
+
         else:
             index += 1
 
-    return root_span_dict, span_dict
+    return (root_span_dict, span_dict) if kind == "spans" else metric_dict
+
+
+def extract_spans_from_output(output_lines: list):
+    """
+    For a given list of output lines, it extracts the json spans and creates 
two dictionaries.
+
+    :return: root spans dict (key: root_span_id - value: root_span), spans 
dict (key: span_id - value: span)
+    """
+    return _extract_obj_from_output(output_lines, "spans")
+
+
+def extract_metrics_from_output(output_lines: list):
+    """For a given list of output lines, it extracts the json metrics and 
creates a dictionary."""
+
+    return _extract_obj_from_output(output_lines, "metrics")
 
 
 def get_id_for_a_given_name(span_dict: dict, span_name: str):

Reply via email to