This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-8-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5d69fc142d9c475940d11b1bbb56bf1a6c36c7fb Author: Jakub Dardzinski <kuba0...@gmail.com> AuthorDate: Tue Nov 21 06:45:04 2023 +0100 Add basic metrics to stats collector. (#35368) Signed-off-by: Jakub Dardzinski <kuba0...@gmail.com> Co-authored-by: Elad Kalif <45845474+elad...@users.noreply.github.com> --- airflow/providers/openlineage/plugins/adapter.py | 5 +- .../plugins/test_openlineage_adapter.py | 75 +++++++++++++++++++--- 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/airflow/providers/openlineage/plugins/adapter.py b/airflow/providers/openlineage/plugins/adapter.py index fabb14eaa3..a925ddf8e6 100644 --- a/airflow/providers/openlineage/plugins/adapter.py +++ b/airflow/providers/openlineage/plugins/adapter.py @@ -38,6 +38,7 @@ from openlineage.client.run import Job, Run, RunEvent, RunState from airflow.configuration import conf from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION from airflow.providers.openlineage.utils.utils import OpenLineageRedactor +from airflow.stats import Stats from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: @@ -113,8 +114,10 @@ class OpenLineageAdapter(LoggingMixin): self._client = self.get_or_create_openlineage_client() redacted_event: RunEvent = self._redacter.redact(event, max_depth=20) # type: ignore[assignment] try: - return self._client.emit(redacted_event) + with Stats.timer("ol.emit.attempts"): + return self._client.emit(redacted_event) except Exception as e: + Stats.incr("ol.emit.failed") self.log.warning("Failed to emit OpenLineage event of id %s", event.run.runId) self.log.debug("OpenLineage emission failure: %s", e) diff --git a/tests/providers/openlineage/plugins/test_openlineage_adapter.py b/tests/providers/openlineage/plugins/test_openlineage_adapter.py index 685e88c725..bcb92b2b9b 100644 --- a/tests/providers/openlineage/plugins/test_openlineage_adapter.py +++ b/tests/providers/openlineage/plugins/test_openlineage_adapter.py @@ -39,7 +39,10 @@ from tests.test_utils.config import conf_vars pytestmark = pytest.mark.db_test -@patch.dict(os.environ, {"OPENLINEAGE_URL": "http://ol-api:5000", "OPENLINEAGE_API_KEY": "api-key"}) +@patch.dict( + os.environ, + {"OPENLINEAGE_URL": "http://ol-api:5000", "OPENLINEAGE_API_KEY": "api-key"}, +) def test_create_client_from_ol_env(): client = OpenLineageAdapter().get_or_create_openlineage_client() @@ -90,7 +93,11 @@ def test_create_client_from_env_var_config(): @patch.dict( - os.environ, {"OPENLINEAGE_URL": "http://ol-from-env:5000", "OPENLINEAGE_API_KEY": "api-key-from-env"} + os.environ, + { + "OPENLINEAGE_URL": "http://ol-from-env:5000", + "OPENLINEAGE_API_KEY": "api-key-from-env", + }, ) @patch.dict(os.environ, {"OPENLINEAGE_CONFIG": "some/config.yml"}) def test_create_client_overrides_env_vars(): @@ -108,7 +115,9 @@ def test_create_client_overrides_env_vars(): assert client.transport.kind == "console" -def test_emit_start_event(): +@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer") +@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr") +def test_emit_start_event(mock_stats_incr, mock_stats_timer): client = MagicMock() adapter = OpenLineageAdapter(client) @@ -138,7 +147,8 @@ def test_emit_start_event(): runId=run_id, facets={ "nominalTime": NominalTimeRunFacet( - nominalStartTime="2022-01-01T00:00:00", nominalEndTime="2022-01-01T00:00:00" + nominalStartTime="2022-01-01T00:00:00", + nominalEndTime="2022-01-01T00:00:00", ), "processing_engine": ProcessingEngineRunFacet( version=ANY, name="Airflow", openlineageAdapterVersion=ANY @@ -158,8 +168,13 @@ def test_emit_start_event(): in client.emit.mock_calls ) + mock_stats_incr.assert_not_called() + mock_stats_timer.assert_called_with("ol.emit.attempts") + -def test_emit_complete_event(): +@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer") +@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr") +def test_emit_complete_event(mock_stats_incr, mock_stats_timer): client = MagicMock() adapter = OpenLineageAdapter(client) @@ -187,8 +202,13 @@ def test_emit_complete_event(): in client.emit.mock_calls ) + mock_stats_incr.assert_not_called() + mock_stats_timer.assert_called_with("ol.emit.attempts") -def test_emit_failed_event(): + +@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer") +@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr") +def test_emit_failed_event(mock_stats_incr, mock_stats_timer): client = MagicMock() adapter = OpenLineageAdapter(client) @@ -216,9 +236,14 @@ def test_emit_failed_event(): in client.emit.mock_calls ) + mock_stats_incr.assert_not_called() + mock_stats_timer.assert_called_with("ol.emit.attempts") + @mock.patch("airflow.providers.openlineage.plugins.adapter.uuid") -def test_emit_dag_started_event(uuid): +@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer") +@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr") +def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, uuid): random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e" client = MagicMock() adapter = OpenLineageAdapter(client) @@ -248,7 +273,8 @@ def test_emit_dag_started_event(uuid): runId=random_uuid, facets={ "nominalTime": NominalTimeRunFacet( - nominalStartTime=event_time.isoformat(), nominalEndTime=event_time.isoformat() + nominalStartTime=event_time.isoformat(), + nominalEndTime=event_time.isoformat(), ) }, ), @@ -261,9 +287,14 @@ def test_emit_dag_started_event(uuid): in client.emit.mock_calls ) + mock_stats_incr.assert_not_called() + mock_stats_timer.assert_called_with("ol.emit.attempts") + @mock.patch("airflow.providers.openlineage.plugins.adapter.uuid") -def test_emit_dag_complete_event(uuid): +@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer") +@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr") +def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, uuid): random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e" client = MagicMock() adapter = OpenLineageAdapter(client) @@ -298,9 +329,14 @@ def test_emit_dag_complete_event(uuid): in client.emit.mock_calls ) + mock_stats_incr.assert_not_called() + mock_stats_timer.assert_called_with("ol.emit.attempts") + @mock.patch("airflow.providers.openlineage.plugins.adapter.uuid") -def test_emit_dag_failed_event(uuid): +@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer") +@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr") +def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, uuid): random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e" client = MagicMock() adapter = OpenLineageAdapter(client) @@ -341,3 +377,22 @@ def test_emit_dag_failed_event(uuid): ) in client.emit.mock_calls ) + + mock_stats_incr.assert_not_called() + mock_stats_timer.assert_called_with("ol.emit.attempts") + + +@patch("airflow.providers.openlineage.plugins.adapter.OpenLineageAdapter.get_or_create_openlineage_client") +@patch("airflow.providers.openlineage.plugins.adapter.OpenLineageRedactor") +@patch("airflow.providers.openlineage.plugins.adapter.Stats.timer") +@patch("airflow.providers.openlineage.plugins.adapter.Stats.incr") +def test_openlineage_adapter_stats_emit_failed( + mock_stats_incr, mock_stats_timer, mock_redact, mock_get_client +): + adapter = OpenLineageAdapter() + mock_get_client.return_value.emit.side_effect = Exception() + + adapter.emit(MagicMock()) + + mock_stats_timer.assert_called_with("ol.emit.attempts") + mock_stats_incr.assert_has_calls([mock.call("ol.emit.failed")])