This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5d69fc142d9c475940d11b1bbb56bf1a6c36c7fb
Author: Jakub Dardzinski <kuba0...@gmail.com>
AuthorDate: Tue Nov 21 06:45:04 2023 +0100

    Add basic metrics to stats collector. (#35368)
    
    Signed-off-by: Jakub Dardzinski <kuba0...@gmail.com>
    Co-authored-by: Elad Kalif <45845474+elad...@users.noreply.github.com>
---
 airflow/providers/openlineage/plugins/adapter.py   |  5 +-
 .../plugins/test_openlineage_adapter.py            | 75 +++++++++++++++++++---
 2 files changed, 69 insertions(+), 11 deletions(-)

diff --git a/airflow/providers/openlineage/plugins/adapter.py 
b/airflow/providers/openlineage/plugins/adapter.py
index fabb14eaa3..a925ddf8e6 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -38,6 +38,7 @@ from openlineage.client.run import Job, Run, RunEvent, 
RunState
 from airflow.configuration import conf
 from airflow.providers.openlineage import __version__ as 
OPENLINEAGE_PROVIDER_VERSION
 from airflow.providers.openlineage.utils.utils import OpenLineageRedactor
+from airflow.stats import Stats
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 if TYPE_CHECKING:
@@ -113,8 +114,10 @@ class OpenLineageAdapter(LoggingMixin):
             self._client = self.get_or_create_openlineage_client()
         redacted_event: RunEvent = self._redacter.redact(event, max_depth=20)  
# type: ignore[assignment]
         try:
-            return self._client.emit(redacted_event)
+            with Stats.timer("ol.emit.attempts"):
+                return self._client.emit(redacted_event)
         except Exception as e:
+            Stats.incr("ol.emit.failed")
             self.log.warning("Failed to emit OpenLineage event of id %s", 
event.run.runId)
             self.log.debug("OpenLineage emission failure: %s", e)
 
diff --git a/tests/providers/openlineage/plugins/test_openlineage_adapter.py 
b/tests/providers/openlineage/plugins/test_openlineage_adapter.py
index 685e88c725..bcb92b2b9b 100644
--- a/tests/providers/openlineage/plugins/test_openlineage_adapter.py
+++ b/tests/providers/openlineage/plugins/test_openlineage_adapter.py
@@ -39,7 +39,10 @@ from tests.test_utils.config import conf_vars
 pytestmark = pytest.mark.db_test
 
 
-@patch.dict(os.environ, {"OPENLINEAGE_URL": "http://ol-api:5000";, 
"OPENLINEAGE_API_KEY": "api-key"})
+@patch.dict(
+    os.environ,
+    {"OPENLINEAGE_URL": "http://ol-api:5000";, "OPENLINEAGE_API_KEY": 
"api-key"},
+)
 def test_create_client_from_ol_env():
     client = OpenLineageAdapter().get_or_create_openlineage_client()
 
@@ -90,7 +93,11 @@ def test_create_client_from_env_var_config():
 
 
 @patch.dict(
-    os.environ, {"OPENLINEAGE_URL": "http://ol-from-env:5000";, 
"OPENLINEAGE_API_KEY": "api-key-from-env"}
+    os.environ,
+    {
+        "OPENLINEAGE_URL": "http://ol-from-env:5000";,
+        "OPENLINEAGE_API_KEY": "api-key-from-env",
+    },
 )
 @patch.dict(os.environ, {"OPENLINEAGE_CONFIG": "some/config.yml"})
 def test_create_client_overrides_env_vars():
@@ -108,7 +115,9 @@ def test_create_client_overrides_env_vars():
         assert client.transport.kind == "console"
 
 
-def test_emit_start_event():
+@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
+@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_start_event(mock_stats_incr, mock_stats_timer):
     client = MagicMock()
     adapter = OpenLineageAdapter(client)
 
@@ -138,7 +147,8 @@ def test_emit_start_event():
                     runId=run_id,
                     facets={
                         "nominalTime": NominalTimeRunFacet(
-                            nominalStartTime="2022-01-01T00:00:00", 
nominalEndTime="2022-01-01T00:00:00"
+                            nominalStartTime="2022-01-01T00:00:00",
+                            nominalEndTime="2022-01-01T00:00:00",
                         ),
                         "processing_engine": ProcessingEngineRunFacet(
                             version=ANY, name="Airflow", 
openlineageAdapterVersion=ANY
@@ -158,8 +168,13 @@ def test_emit_start_event():
         in client.emit.mock_calls
     )
 
+    mock_stats_incr.assert_not_called()
+    mock_stats_timer.assert_called_with("ol.emit.attempts")
+
 
-def test_emit_complete_event():
+@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
+@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_complete_event(mock_stats_incr, mock_stats_timer):
     client = MagicMock()
     adapter = OpenLineageAdapter(client)
 
@@ -187,8 +202,13 @@ def test_emit_complete_event():
         in client.emit.mock_calls
     )
 
+    mock_stats_incr.assert_not_called()
+    mock_stats_timer.assert_called_with("ol.emit.attempts")
 
-def test_emit_failed_event():
+
+@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
+@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_failed_event(mock_stats_incr, mock_stats_timer):
     client = MagicMock()
     adapter = OpenLineageAdapter(client)
 
@@ -216,9 +236,14 @@ def test_emit_failed_event():
         in client.emit.mock_calls
     )
 
+    mock_stats_incr.assert_not_called()
+    mock_stats_timer.assert_called_with("ol.emit.attempts")
+
 
 @mock.patch("airflow.providers.openlineage.plugins.adapter.uuid")
-def test_emit_dag_started_event(uuid):
+@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
+@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, uuid):
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
     adapter = OpenLineageAdapter(client)
@@ -248,7 +273,8 @@ def test_emit_dag_started_event(uuid):
                     runId=random_uuid,
                     facets={
                         "nominalTime": NominalTimeRunFacet(
-                            nominalStartTime=event_time.isoformat(), 
nominalEndTime=event_time.isoformat()
+                            nominalStartTime=event_time.isoformat(),
+                            nominalEndTime=event_time.isoformat(),
                         )
                     },
                 ),
@@ -261,9 +287,14 @@ def test_emit_dag_started_event(uuid):
         in client.emit.mock_calls
     )
 
+    mock_stats_incr.assert_not_called()
+    mock_stats_timer.assert_called_with("ol.emit.attempts")
+
 
 @mock.patch("airflow.providers.openlineage.plugins.adapter.uuid")
-def test_emit_dag_complete_event(uuid):
+@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
+@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, uuid):
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
     adapter = OpenLineageAdapter(client)
@@ -298,9 +329,14 @@ def test_emit_dag_complete_event(uuid):
         in client.emit.mock_calls
     )
 
+    mock_stats_incr.assert_not_called()
+    mock_stats_timer.assert_called_with("ol.emit.attempts")
+
 
 @mock.patch("airflow.providers.openlineage.plugins.adapter.uuid")
-def test_emit_dag_failed_event(uuid):
+@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
+@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, uuid):
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
     adapter = OpenLineageAdapter(client)
@@ -341,3 +377,22 @@ def test_emit_dag_failed_event(uuid):
         )
         in client.emit.mock_calls
     )
+
+    mock_stats_incr.assert_not_called()
+    mock_stats_timer.assert_called_with("ol.emit.attempts")
+
+
+@patch("airflow.providers.openlineage.plugins.adapter.OpenLineageAdapter.get_or_create_openlineage_client")
+@patch("airflow.providers.openlineage.plugins.adapter.OpenLineageRedactor")
+@patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
+@patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_openlineage_adapter_stats_emit_failed(
+    mock_stats_incr, mock_stats_timer, mock_redact, mock_get_client
+):
+    adapter = OpenLineageAdapter()
+    mock_get_client.return_value.emit.side_effect = Exception()
+
+    adapter.emit(MagicMock())
+
+    mock_stats_timer.assert_called_with("ol.emit.attempts")
+    mock_stats_incr.assert_has_calls([mock.call("ol.emit.failed")])

Reply via email to