This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2e886a4dac7 cache connections in OpenLineage SQL hook lineage (#64843)
2e886a4dac7 is described below
commit 2e886a4dac7e35c453767e4ef7b8e05fc6adb964
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Fri May 15 14:03:39 2026 +0200
cache connections in OpenLineage SQL hook lineage (#64843)
* OpenLineage: cache connection per conn_id in emit_lineage_from_sql_extras
emit_lineage_from_sql_extras called hook.get_connection() twice per SQL
extra: once in _resolve_namespace and once inside
get_openlineage_facets_with_sql. For N extras from the same hook this
is N*2 round-trips (SecretsManager miss + Airflow API server hit each
time) all returning the same object.
Add _conn_cache dict keyed by conn_id so the connection, database_info,
and namespace are resolved exactly once per unique conn_id. Introduce
_resolve_connection_info() that returns all three from a single
get_connection() call. Add optional connection/database_info params to
get_openlineage_facets_with_sql so callers can pass pre-fetched values
and skip the internal lookup entirely.
Also fix two related issues found while investigating:
- extractors/manager.py: wrap get_hook_lineage() in try/except in the
no-extractor path. An unhandled exception from
emit_lineage_from_sql_extras
-> _create_ol_event_pair propagated to @print_warning on on_success(),
silently suppressing the task-level COMPLETE event.
- plugins/listener.py: call logging.shutdown() before os._exit(0) in the
fork child. os._exit bypasses Python's stdio flush so any buffered log
messages at exit (including failure warnings) were silently dropped,
making fork failures invisible in task logs.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
Signed-off-by: Maciej Obuchowski <[email protected]>
* refactor: use @cache local closures instead of manual cache dict
Replace the _conn_cache dict and _resolve_connection_info tuple helper
with three @functools.cache-decorated local functions keyed by conn_id:
_get_connection - one hook.get_connection() call per conn_id
_get_database_info - derived from _get_connection, cached separately
_get_namespace - derived from _get_database_info, cached separately
Each concern is cached independently. No tuple packing/unpacking.
_resolve_connection_info removed; _resolve_namespace restored to a
simple single-lookup implementation for external callers.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
Signed-off-by: Maciej Obuchowski <[email protected]>
* remove unused _resolve_namespace
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
Signed-off-by: Maciej Obuchowski <[email protected]>
* fix review: make hook lineage error messages state-agnostic
The comment and log message incorrectly referenced "COMPLETE event"
but extract_metadata is called for all task states (RUNNING, SUCCESS,
FAILED, SKIPPED).
Signed-off-by: Maciej Obuchowski <[email protected]>
* fix tests: update for _resolve_namespace removal
- Remove TestResolveNamespace class (function no longer exists)
- Replace _resolve_namespace mock with SQLParser.create_namespace mock
- Fix test_job_id_only_extra_emits_events: conn_id=None means no namespace
- Remove over-specific mock assertions on _get_hook_conn_id calls
Signed-off-by: Maciej Obuchowski <[email protected]>
* fix review: cache by hook identity instead of conn_id
Use id(hook) as the @cache key instead of conn_id to ensure distinct
hook instances sharing the same conn_id but with different params
get separate cached database info results.
Signed-off-by: Maciej Obuchowski <[email protected]>
Signed-off-by: Maciej Obuchowski <[email protected]>
* fix review: address Copilot feedback on connection/database_info overrides
- Split conditional in get_openlineage_facets_with_sql so
hook.get_connection
is only called when connection is None (previously re-fetched even when
caller passed pre-fetched connection, defeating the cache).
- Make connection and database_info keyword-only with Any | None types for
clarity and to prevent positional misuse.
- Reword the explanatory comment for grammatical clarity.
- Add unit tests covering all four override combinations.
Addresses review comments on PR #64843.
Signed-off-by: Maciej Obuchowski <[email protected]>
* fix mypy: type _hook_info value as Any so attribute access type-checks
mypy doesn't know hook.get_connection / get_openlineage_database_info
exist on object. Type the value side of the (hook, conn_id) tuple as
Any (matching how DbApiHook is treated elsewhere in this module) so the
mypy providers check passes.
Signed-off-by: Maciej Obuchowski <[email protected]>
---------
Signed-off-by: Maciej Obuchowski <[email protected]>
Signed-off-by: Maciej Obuchowski <[email protected]>
Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
.../providers/openlineage/extractors/manager.py | 20 +++++-
.../providers/openlineage/plugins/listener.py | 17 ++++-
.../src/airflow/providers/openlineage/sqlparser.py | 27 +++++---
.../openlineage/utils/sql_hook_lineage.py | 62 ++++++++++--------
.../tests/unit/openlineage/test_sqlparser.py | 67 ++++++++++++++++++++
.../openlineage/utils/test_sql_hook_lineage.py | 73 +++++++++-------------
6 files changed, 187 insertions(+), 79 deletions(-)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py
b/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py
index f587550bef7..276636e129e 100644
---
a/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py
+++
b/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py
@@ -143,9 +143,25 @@ class ExtractorManager(LoggingMixin):
task_info,
)
self.log.debug("OpenLineage extraction failure details:",
exc_info=True)
- elif (hook_lineage := self.get_hook_lineage(task_instance,
task_instance_state)) is not None:
- return hook_lineage
else:
+ # No extractor found — fall back to hook lineage. This call must
be wrapped in
+ # try/except: it runs emit_lineage_from_sql_extras →
_create_ol_event_pair which
+ # is not guarded internally. An uncaught exception here would
propagate up to the
+ # listener's @print_warning decorator, silently suppressing the
task-level event.
+ try:
+ hook_lineage = self.get_hook_lineage(task_instance,
task_instance_state)
+ except Exception as e:
+ self.log.warning(
+ "Failed to extract hook lineage %s: %s. Task event will be
emitted without lineage.",
+ task_info,
+ e,
+ )
+ self.log.debug("OpenLineage hook lineage failure details:",
exc_info=True)
+ hook_lineage = None
+
+ if hook_lineage is not None:
+ return hook_lineage
+
self.log.debug("Unable to find an extractor %s", task_info)
# Only include the unknownSourceAttribute facet if there is no
extractor
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
index 6ab1944c393..0f4b0f3ca23 100644
---
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
+++
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
@@ -855,8 +855,21 @@ class OpenLineageListener:
if not AIRFLOW_V_3_0_PLUS:
configure_orm(disable_connection_pool=True)
self.log.debug("Executing OpenLineage process - %s - pid %s",
callable_name, os.getpid())
- callable()
- self.log.debug("Process with current pid finishes after %s",
callable_name)
+ try:
+ callable()
+ self.log.debug("Process with current pid finishes after %s",
callable_name)
+ except Exception:
+ self.log.warning(
+ "OpenLineage %s process failed. This has no impact on
actual task execution status.",
+ callable_name,
+ exc_info=True,
+ )
+ finally:
+ # os._exit(0) bypasses Python's atexit/stdio flush. Explicitly
shut down
+ # logging so buffered records (including any warnings above)
are flushed
+ # before the process exits. Without this, the final log lines
are silently
+ # dropped, making failures invisible.
+ logging.shutdown()
os._exit(0)
@property
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/sqlparser.py
b/providers/openlineage/src/airflow/providers/openlineage/sqlparser.py
index 3b82300207c..8ce25f342f4 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/sqlparser.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/sqlparser.py
@@ -18,7 +18,7 @@ from __future__ import annotations
import logging
from collections.abc import Callable
-from typing import TYPE_CHECKING, TypedDict
+from typing import TYPE_CHECKING, Any, TypedDict
import sqlparse
from attrs import define
@@ -473,13 +473,26 @@ class SQLParser(LoggingMixin):
def get_openlineage_facets_with_sql(
- hook: DbApiHook, sql: str | list[str], conn_id: str, database: str | None,
use_connection: bool = True
+ hook: DbApiHook,
+ sql: str | list[str],
+ conn_id: str,
+ database: str | None,
+ use_connection: bool = True,
+ *,
+ connection: Any | None = None,
+ database_info: Any | None = None,
) -> OperatorLineage | None:
- connection = hook.get_connection(conn_id)
- try:
- database_info = hook.get_openlineage_database_info(connection)
- except AttributeError:
- database_info = None
+ # Accept pre-fetched connection and database_info to avoid redundant
hook.get_connection()
+ # calls when processing multiple SQL extras from the same hook. Each
get_connection() call
+ # hits SecretsManager (miss) then the Airflow API server — passing these
in avoids that cost.
+ if connection is None:
+ connection = hook.get_connection(conn_id)
+
+ if database_info is None:
+ try:
+ database_info = hook.get_openlineage_database_info(connection)
+ except AttributeError:
+ database_info = None
if database_info is None:
log.debug("%s has no database info provided", hook)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/utils/sql_hook_lineage.py
b/providers/openlineage/src/airflow/providers/openlineage/utils/sql_hook_lineage.py
index ce8331c9a59..065fea24779 100644
---
a/providers/openlineage/src/airflow/providers/openlineage/utils/sql_hook_lineage.py
+++
b/providers/openlineage/src/airflow/providers/openlineage/utils/sql_hook_lineage.py
@@ -19,7 +19,8 @@
from __future__ import annotations
import logging
-from typing import TYPE_CHECKING
+from functools import cache
+from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
from openlineage.client.event_v2 import Dataset, Job, Run, RunEvent, RunState
@@ -112,6 +113,34 @@ def emit_lineage_from_sql_extras(task_instance,
sql_extras: list, is_successful:
events: list[RunEvent] = []
query_count = 0
+ # Build hook identity -> (hook, conn_id) mapping before iterating.
+ # Using id(hook) as cache key instead of conn_id ensures distinct hook
instances
+ # with the same conn_id but different params are cached separately.
+ _hook_info: dict[int, tuple[Any, str | None]] = {}
+ for e in sql_extras:
+ hid = id(e.context)
+ if hid not in _hook_info:
+ _hook_info[hid] = (e.context, _get_hook_conn_id(e.context))
+
+ @cache
+ def _get_connection(hook_id: int):
+ hook, conn_id = _hook_info[hook_id]
+ return hook.get_connection(conn_id)
+
+ @cache
+ def _get_database_info(hook_id: int):
+ hook, conn_id = _hook_info[hook_id]
+ try:
+ return hook.get_openlineage_database_info(_get_connection(hook_id))
+ except Exception as e:
+ log.debug("Failed to get OpenLineage database info for %s: %s",
conn_id, e)
+ return None
+
+ @cache
+ def _get_namespace(hook_id: int) -> str | None:
+ db_info = _get_database_info(hook_id)
+ return SQLParser.create_namespace(db_info) if db_info is not None else
None
+
for extra_info in sql_extras:
value = extra_info.value
@@ -124,12 +153,13 @@ def emit_lineage_from_sql_extras(task_instance,
sql_extras: list, is_successful:
query_count += 1
hook = extra_info.context
- conn_id = _get_hook_conn_id(hook)
- namespace = _resolve_namespace(hook, conn_id)
+ hook_id = id(hook)
+ conn_id = _hook_info[hook_id][1]
# Parse SQL to obtain lineage (inputs, outputs, facets)
query_lineage: OperatorLineage | None = None
- if sql_text and conn_id:
+ database_info = _get_database_info(hook_id) if conn_id else None
+ if sql_text and conn_id and database_info is not None:
try:
query_lineage = get_openlineage_facets_with_sql(
hook=hook,
@@ -137,6 +167,8 @@ def emit_lineage_from_sql_extras(task_instance, sql_extras:
list, is_successful:
conn_id=conn_id,
database=value.get(SqlJobHookLineageExtra.VALUE__DEFAULT_DB.value),
use_connection=False, # Temporary solution before we
figure out timeouts for queries
+ connection=_get_connection(hook_id),
+ database_info=database_info,
)
except Exception as e:
log.debug("Failed to parse SQL for query %s: %s", query_count,
e)
@@ -149,6 +181,7 @@ def emit_lineage_from_sql_extras(task_instance, sql_extras:
list, is_successful:
query_lineage = OperatorLineage(job_facets=job_facets)
# Enrich run facets with external query info when available.
+ namespace = _get_namespace(hook_id) if conn_id else None
if job_id and namespace:
query_lineage.run_facets.setdefault(
"externalQuery",
@@ -183,27 +216,6 @@ def emit_lineage_from_sql_extras(task_instance,
sql_extras: list, is_successful:
return None
-def _resolve_namespace(hook, conn_id: str | None) -> str | None:
- """
- Resolve the OpenLineage namespace from a hook.
-
- Tries ``hook.get_openlineage_database_info`` to build the namespace.
- Returns ``None`` when the hook does not expose this method.
- """
- if conn_id:
- try:
- connection = hook.get_connection(conn_id)
- database_info = hook.get_openlineage_database_info(connection)
- except Exception as e:
- log.debug("Failed to get OpenLineage database info: %s", e)
- database_info = None
-
- if database_info is not None:
- return SQLParser.create_namespace(database_info)
-
- return None
-
-
def _get_hook_conn_id(hook) -> str | None:
"""
Try to extract the connection ID from a hook instance.
diff --git a/providers/openlineage/tests/unit/openlineage/test_sqlparser.py
b/providers/openlineage/tests/unit/openlineage/test_sqlparser.py
index 07162d10532..a10cf1446ef 100644
--- a/providers/openlineage/tests/unit/openlineage/test_sqlparser.py
+++ b/providers/openlineage/tests/unit/openlineage/test_sqlparser.py
@@ -460,3 +460,70 @@ class TestGetOpenlineageFacetsWithSql:
)
hook.get_sqlalchemy_engine.assert_called_once()
+
+
@mock.patch("airflow.providers.openlineage.sqlparser.SQLParser.generate_openlineage_metadata_from_sql")
+ def test_connection_provided_skips_get_connection(self, mock_generate):
+ """When a pre-fetched connection is passed, hook.get_connection is not
called."""
+ hook = MagicMock()
+ db_info = DatabaseInfo(scheme="myscheme", authority="host:port")
+ hook.get_openlineage_database_info.return_value = db_info
+ hook.get_openlineage_database_dialect.return_value = "generic"
+ hook.get_openlineage_default_schema.return_value = "public"
+ mock_generate.return_value = MagicMock()
+
+ prefetched_connection = MagicMock()
+ get_openlineage_facets_with_sql(
+ hook=hook,
+ sql="SELECT 1",
+ conn_id="conn",
+ database=None,
+ use_connection=False,
+ connection=prefetched_connection,
+ )
+
+ hook.get_connection.assert_not_called()
+
hook.get_openlineage_database_info.assert_called_once_with(prefetched_connection)
+
+
@mock.patch("airflow.providers.openlineage.sqlparser.SQLParser.generate_openlineage_metadata_from_sql")
+ def test_database_info_provided_skips_get_openlineage_database_info(self,
mock_generate):
+ """When pre-fetched database_info is passed,
hook.get_openlineage_database_info is not called."""
+ hook = MagicMock()
+ prefetched_db_info = DatabaseInfo(scheme="myscheme",
authority="host:port")
+ hook.get_openlineage_database_dialect.return_value = "generic"
+ hook.get_openlineage_default_schema.return_value = "public"
+ mock_generate.return_value = MagicMock()
+
+ get_openlineage_facets_with_sql(
+ hook=hook,
+ sql="SELECT 1",
+ conn_id="conn",
+ database=None,
+ use_connection=False,
+ database_info=prefetched_db_info,
+ )
+
+ hook.get_connection.assert_called_once_with("conn")
+ hook.get_openlineage_database_info.assert_not_called()
+
+
@mock.patch("airflow.providers.openlineage.sqlparser.SQLParser.generate_openlineage_metadata_from_sql")
+ def
test_both_connection_and_database_info_provided_skips_hook_lookups(self,
mock_generate):
+ """When both are passed, neither hook.get_connection nor
get_openlineage_database_info is called."""
+ hook = MagicMock()
+ hook.get_openlineage_database_dialect.return_value = "generic"
+ hook.get_openlineage_default_schema.return_value = "public"
+ mock_generate.return_value = MagicMock()
+
+ prefetched_connection = MagicMock()
+ prefetched_db_info = DatabaseInfo(scheme="myscheme",
authority="host:port")
+ get_openlineage_facets_with_sql(
+ hook=hook,
+ sql="SELECT 1",
+ conn_id="conn",
+ database=None,
+ use_connection=False,
+ connection=prefetched_connection,
+ database_info=prefetched_db_info,
+ )
+
+ hook.get_connection.assert_not_called()
+ hook.get_openlineage_database_info.assert_not_called()
diff --git
a/providers/openlineage/tests/unit/openlineage/utils/test_sql_hook_lineage.py
b/providers/openlineage/tests/unit/openlineage/utils/test_sql_hook_lineage.py
index 835766b3f93..f2e97c568ca 100644
---
a/providers/openlineage/tests/unit/openlineage/utils/test_sql_hook_lineage.py
+++
b/providers/openlineage/tests/unit/openlineage/utils/test_sql_hook_lineage.py
@@ -28,7 +28,6 @@ from airflow.providers.openlineage.extractors.base import
OperatorLineage
from airflow.providers.openlineage.sqlparser import SQLParser
from airflow.providers.openlineage.utils.sql_hook_lineage import (
_get_hook_conn_id,
- _resolve_namespace,
emit_lineage_from_sql_extras,
)
@@ -67,51 +66,12 @@ class TestGetHookConnId:
assert _get_hook_conn_id(hook) is None
-class TestResolveNamespace:
- def test_from_ol_database_info(self):
- hook = mock.MagicMock()
- connection = mock.MagicMock()
- hook.get_connection.return_value = connection
- database_info = mock.MagicMock()
- hook.get_openlineage_database_info.return_value = database_info
-
- with mock.patch(
-
"airflow.providers.openlineage.utils.sql_hook_lineage.SQLParser.create_namespace",
- return_value="postgres://host:5432/mydb",
- ) as mock_create_ns:
- result = _resolve_namespace(hook, "my_conn")
-
- hook.get_connection.assert_called_once_with("my_conn")
- hook.get_openlineage_database_info.assert_called_once_with(connection)
- mock_create_ns.assert_called_once_with(database_info)
- assert result == "postgres://host:5432/mydb"
-
- def test_returns_none_when_no_namespace_available(self):
- hook = mock.MagicMock()
- hook.__class__.__name__ = "SomeUnknownHook"
- hook.get_connection.side_effect = Exception("no method")
-
- with mock.patch.dict("sys.modules"):
- result = _resolve_namespace(hook, "my_conn")
-
- assert result is None
-
- def test_returns_none_when_no_conn_id(self):
- hook = mock.MagicMock()
- hook.__class__.__name__ = "SomeUnknownHook"
-
- with mock.patch.dict("sys.modules"):
- result = _resolve_namespace(hook, None)
-
- assert result is None
-
-
class TestEmitLineageFromSqlExtras:
@pytest.fixture(autouse=True)
def _patch_deps(self):
with (
mock.patch(f"{_MODULE}._get_hook_conn_id", return_value="my_conn")
as mock_conn_id,
- mock.patch(f"{_MODULE}._resolve_namespace") as mock_ns,
+ mock.patch(f"{_MODULE}.SQLParser.create_namespace") as mock_ns,
mock.patch(f"{_MODULE}.get_openlineage_facets_with_sql") as
mock_facets_fn,
mock.patch(f"{_MODULE}._create_ol_event_pair") as mock_build,
mock.patch(f"{_MODULE}.get_openlineage_listener") as mock_listener,
@@ -142,6 +102,7 @@ class TestEmitLineageFromSqlExtras:
)
assert result is None
self.mock_build.assert_not_called()
+ self.mock_facets_fn.assert_not_called()
self.mock_listener.assert_not_called()
def test_single_query_delegates_to_create_ol_event_pair(self):
@@ -278,7 +239,6 @@ class TestEmitLineageFromSqlExtras:
def test_job_id_only_extra_is_processed(self):
"""An extra with only job_id (no SQL text) still builds and emits an
event pair."""
self.mock_conn_id.return_value = None
- self.mock_ns.return_value = "ns"
self.mock_facets_fn.return_value = None
mock_ti = mock.MagicMock(dag_id="dag_id", task_id="task_id")
@@ -287,9 +247,10 @@ class TestEmitLineageFromSqlExtras:
sql_extras=[_make_extra(sql="", job_id="external-123")],
)
+ # conn_id is None → namespace cannot be resolved → no externalQuery
facet
self.mock_build.assert_called_once()
call = self.mock_build.call_args
- assert call.kwargs["run_facets"]["externalQuery"].externalQueryId ==
"external-123"
+ assert "externalQuery" not in call.kwargs["run_facets"]
assert "sql" not in call.kwargs["job_facets"]
def test_parser_run_facets_preserved_over_external_query(self):
@@ -312,3 +273,29 @@ class TestEmitLineageFromSqlExtras:
call = self.mock_build.call_args
assert call.kwargs["run_facets"]["externalQuery"] is parser_ext_query
+
+ def test_different_hooks_same_conn_id_get_separate_db_info(self):
+ """Two hooks sharing a conn_id but returning different database info
are cached separately."""
+ mock_ti = mock.MagicMock(dag_id="dag_id", task_id="task_id")
+
+ hook_a = mock.MagicMock()
+ hook_b = mock.MagicMock()
+
+ db_info_a = mock.MagicMock()
+ db_info_b = mock.MagicMock()
+ hook_a.get_openlineage_database_info.return_value = db_info_a
+ hook_b.get_openlineage_database_info.return_value = db_info_b
+
+ self.mock_conn_id.return_value = "same_conn"
+ self.mock_ns.side_effect = lambda db_info: f"ns_{id(db_info)}"
+ self.mock_facets_fn.return_value = OperatorLineage()
+
+ extras = [
+ _make_extra(sql="SELECT 1", hook=hook_a),
+ _make_extra(sql="SELECT 2", hook=hook_b),
+ ]
+ emit_lineage_from_sql_extras(task_instance=mock_ti, sql_extras=extras)
+
+ # Both hooks should have had get_openlineage_database_info called
+ hook_a.get_openlineage_database_info.assert_called_once()
+ hook_b.get_openlineage_database_info.assert_called_once()