This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 469545c1b9c Openlineage: Read HTTP API key auth from Airflow 
connection (#66342)
469545c1b9c is described below

commit 469545c1b9c759ab3e49e405612b09a943f0b350
Author: Ulada Zakharava <[email protected]>
AuthorDate: Mon May 25 15:38:52 2026 +0200

    Openlineage: Read HTTP API key auth from Airflow connection (#66342)
---
 providers/openlineage/docs/configurations-ref.rst  |  37 +++-
 providers/openlineage/provider.yaml                |   9 +
 .../src/airflow/providers/openlineage/conf.py      |   8 +
 .../providers/openlineage/get_provider_info.py     |   7 +
 .../providers/openlineage/plugins/adapter.py       |  31 +++-
 .../providers/openlineage/token_provider.py        | 133 ++++++++++++++
 .../tests/unit/openlineage/plugins/test_adapter.py | 162 ++++++++++++++++-
 .../tests/unit/openlineage/test_token_provider.py  | 192 +++++++++++++++++++++
 8 files changed, 568 insertions(+), 11 deletions(-)

diff --git a/providers/openlineage/docs/configurations-ref.rst 
b/providers/openlineage/docs/configurations-ref.rst
index 258c9761d56..fb78ad2424f 100644
--- a/providers/openlineage/docs/configurations-ref.rst
+++ b/providers/openlineage/docs/configurations-ref.rst
@@ -56,6 +56,36 @@ If you want to look at OpenLineage events without sending 
them anywhere, you can
     [openlineage]
     transport = {"type": "console"}
 
+OpenLineage config stored in an Airflow connection
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can store OpenLineage client configuration in a Generic Airflow connection 
instead of putting the full JSON
+configuration directly in ``airflow.cfg``. Set ``config_conn_id`` to the 
connection ID and store the OpenLineage
+configuration in the connection extra as JSON.
+
+.. code-block:: ini
+
+    [openlineage]
+    config_conn_id = openlineage_default
+
+Connection extra should contain the OpenLineage client configuration:
+
+.. code-block:: json
+
+    {
+      "transport": {
+        "type": "http",
+        "url": "http://example.com:5000";,
+        "auth": {"type": "airflow_connection_api_key"}
+      }
+    }
+
+For HTTP transports that require API key authentication, you can keep the 
token in the Airflow connection password.
+Set ``auth.type`` to ``airflow_connection_api_key``. When the config is loaded 
from ``config_conn_id``, the provider
+reads the API key from the same connection password by default. You can also 
set ``auth.conn_id`` to read the token
+from another Airflow connection. The provider resolves 
``airflow_connection_api_key`` to standard OpenLineage
+``api_key`` auth before creating the OpenLineage client.
+
 .. note::
   For full list of built-in transport types, specific transport's options or 
instructions on how to implement your custom transport, refer to
   `Python client documentation 
<https://openlineage.io/docs/client/python/configuration#transports>`_.
@@ -100,9 +130,10 @@ Primary, and recommended method of configuring OpenLineage 
Airflow Provider is A
 As there are multiple possible ways of configuring OpenLineage, it's important 
to keep in mind the precedence of different configurations.
 OpenLineage Airflow Provider looks for the configuration in the following 
order:
 
-1. Check ``config_path`` in ``airflow.cfg`` under ``openlineage`` section (or 
AIRFLOW__OPENLINEAGE__CONFIG_PATH environment variable)
-2. Check ``transport`` in ``airflow.cfg`` under ``openlineage`` section (or 
AIRFLOW__OPENLINEAGE__TRANSPORT environment variable)
-3. If all the above options are missing, the OpenLineage Python client used 
underneath looks for configuration in the order described in `this 
<https://openlineage.io/docs/client/python/configuration>`_ documentation. 
Please note that **using Airflow configuration is encouraged** and is the only 
future proof solution.
+1. Check ``config_conn_id`` in ``airflow.cfg`` under ``openlineage`` section.
+2. Check ``config_path`` in ``airflow.cfg`` under ``openlineage`` section (or 
AIRFLOW__OPENLINEAGE__CONFIG_PATH environment variable)
+3. Check ``transport`` in ``airflow.cfg`` under ``openlineage`` section (or 
AIRFLOW__OPENLINEAGE__TRANSPORT environment variable)
+4. If all the above options are missing, the OpenLineage Python client used 
underneath looks for configuration in the order described in `this 
<https://openlineage.io/docs/client/python/configuration>`_ documentation. 
Please note that **using Airflow configuration is encouraged** and is the only 
future proof solution.
 
 
 .. _configuration_selective_enable:openlineage:
diff --git a/providers/openlineage/provider.yaml 
b/providers/openlineage/provider.yaml
index 41489c8d6b4..c8fd3f514b3 100644
--- a/providers/openlineage/provider.yaml
+++ b/providers/openlineage/provider.yaml
@@ -105,6 +105,15 @@ config:
       This section applies settings for OpenLineage integration.
 
     options:
+      config_conn_id:
+        description: |
+          Specify a Generic Airflow connection ID that contains OpenLineage 
configuration in connection
+          extra. This can be used to keep the OpenLineage transport 
configuration, including auth settings,
+          outside of the Airflow configuration file.
+        version_added: ~
+        type: string
+        example: "openlineage_default"
+        default: ""
       config_path:
         description: |
           Specify the path to the YAML configuration file.
diff --git a/providers/openlineage/src/airflow/providers/openlineage/conf.py 
b/providers/openlineage/src/airflow/providers/openlineage/conf.py
index 0fc4611e0c6..3a2ca201e53 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/conf.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/conf.py
@@ -54,6 +54,12 @@ def config_path(check_legacy_env_var: bool = True) -> str:
     return option
 
 
+@cache
+def config_conn_id() -> str:
+    """[openlineage] config_conn_id."""
+    return conf.get(_CONFIG_SECTION, "config_conn_id", fallback="")
+
+
 @cache
 def is_source_enabled() -> bool:
     """[openlineage] disable_source_code."""
@@ -136,6 +142,8 @@ def is_disabled() -> bool:
     if _is_true(os.getenv("OPENLINEAGE_DISABLED", "")):  # Check legacy 
variable
         return True
 
+    if config_conn_id():  # Check if config connection is present
+        return False
     if transport():  # Check if transport is present
         return False
     if config_path(True):  # Check if config file is present
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py 
b/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py
index c222b76aa5d..60cf1a981a6 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py
@@ -50,6 +50,13 @@ def get_provider_info():
             "openlineage": {
                 "description": "This section applies settings for OpenLineage 
integration.\n",
                 "options": {
+                    "config_conn_id": {
+                        "description": "Specify a Generic Airflow connection 
ID that contains OpenLineage configuration in connection\nextra. This can be 
used to keep the OpenLineage transport configuration, including auth 
settings,\noutside of the Airflow configuration file.\n",
+                        "version_added": None,
+                        "type": "string",
+                        "example": "openlineage_default",
+                        "default": "",
+                    },
                     "config_path": {
                         "description": "Specify the path to the YAML 
configuration file.\nThis ensures backwards compatibility with passing config 
through the `openlineage.yml` file.\n",
                         "version_added": None,
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
index 594afe12280..ec81e232fc8 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
@@ -18,7 +18,7 @@ from __future__ import annotations
 
 import os
 import traceback
-from typing import TYPE_CHECKING, Literal
+from typing import TYPE_CHECKING, Any, Literal
 
 import yaml
 from openlineage.client import OpenLineageClient, set_producer
@@ -36,6 +36,10 @@ from openlineage.client.facet_v2 import (
 
 from airflow.providers.common.compat.sdk import Stats, conf as airflow_conf
 from airflow.providers.openlineage import conf
+from airflow.providers.openlineage.token_provider import (
+    AirflowConnectionConfigProvider,
+    resolve_airflow_connection_auth,
+)
 from airflow.providers.openlineage.utils.utils import (
     _PRODUCER,
     OpenLineageRedactor,
@@ -103,22 +107,35 @@ class OpenLineageAdapter(LoggingMixin):
         return self._client
 
     def get_openlineage_config(self) -> dict | None:
-        # First, try to read from YAML file
+        # First, try to read from Airflow connection
+        openlineage_config_conn_id = conf.config_conn_id()
+        if openlineage_config_conn_id:
+            config = 
AirflowConnectionConfigProvider(openlineage_config_conn_id).get_config()
+            resolve_airflow_connection_auth(config=config, 
config_conn_id=openlineage_config_conn_id)
+            return config
+        self.log.debug("OpenLineage config_conn_id configuration not found.")
+
+        # Second, try to read from YAML file
         openlineage_config_path = conf.config_path(check_legacy_env_var=False)
         if openlineage_config_path:
-            config = self._read_yaml_config(openlineage_config_path)
-            return config
+            yaml_config = self._read_yaml_config(openlineage_config_path)
+            if yaml_config is None:
+                return None
+            resolve_airflow_connection_auth(yaml_config)
+            return yaml_config
         self.log.debug("OpenLineage config_path configuration not found.")
 
-        # Second, try to get transport config
+        # Third, try to get transport config
         transport_config = conf.transport()
         if not transport_config:
             self.log.debug("OpenLineage transport configuration not found.")
             return None
-        return {"transport": transport_config}
+        config = {"transport": transport_config}
+        resolve_airflow_connection_auth(config)
+        return config
 
     @staticmethod
-    def _read_yaml_config(path: str) -> dict | None:
+    def _read_yaml_config(path: str) -> dict[str, Any] | None:
         with open(path) as config_file:
             return yaml.safe_load(config_file)
 
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/token_provider.py 
b/providers/openlineage/src/airflow/providers/openlineage/token_provider.py
new file mode 100644
index 00000000000..330d8a7375d
--- /dev/null
+++ b/providers/openlineage/src/airflow/providers/openlineage/token_provider.py
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any
+
+from airflow.providers.common.compat.sdk import AirflowException, BaseHook
+
+AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE = "airflow_connection_api_key"
+_DEFAULT_EXTRA_KEYS = ("apiKey", "api_key", "apikey", "token", "access_token")
+
+
+class OpenLineageAirflowConnectionAuthError(AirflowException):
+    """Raised when OpenLineage API key auth cannot be resolved from an Airflow 
connection."""
+
+
+class OpenLineageAirflowConnectionConfigError(AirflowException):
+    """Raised when OpenLineage config cannot be resolved from an Airflow 
connection."""
+
+
+class AirflowConnectionConfigProvider:
+    """
+    Resolve OpenLineage client configuration from an Airflow connection.
+
+    The connection extra contains the full OpenLineage client config, for 
example
+    ``{"transport": {"type": "console"}}``.
+    """
+
+    def __init__(self, conn_id: str) -> None:
+        if not conn_id:
+            raise OpenLineageAirflowConnectionConfigError(
+                "OpenLineage connection config requires a non-empty connection 
ID."
+            )
+        self.conn_id = conn_id
+
+    def get_config(self) -> dict[str, Any]:
+        connection = BaseHook.get_connection(self.conn_id)
+        return self._validate_config(connection.extra_dejson)
+
+    def _validate_config(self, config: Any) -> dict[str, Any]:
+        if not isinstance(config, dict):
+            raise OpenLineageAirflowConnectionConfigError(
+                f"OpenLineage connection config `{config}` is not a dict."
+            )
+        if not isinstance(config.get("transport"), dict):
+            raise OpenLineageAirflowConnectionConfigError(
+                "OpenLineage connection config must contain a `transport` JSON 
object."
+            )
+        return config
+
+
+class AirflowConnectionTokenProvider:
+    """
+    Resolve an OpenLineage API key from an Airflow connection.
+
+    The connection password is preferred. If it is empty and ``extra_key`` is 
configured, that key
+    is read from connection ``extra``. Otherwise, common extra keys are 
checked.
+    """
+
+    def __init__(self, config: dict[str, Any], default_conn_id: str | None = 
None) -> None:
+        self.conn_id = config.get("conn_id") or default_conn_id or ""
+        self.extra_key = config.get("extra_key")
+        if not self.conn_id:
+            raise OpenLineageAirflowConnectionAuthError(
+                "OpenLineage `airflow_connection_api_key` auth requires a 
non-empty `conn_id`."
+            )
+
+    def get_api_key(self) -> str:
+        connection = BaseHook.get_connection(self.conn_id)
+        if connection.password:
+            return connection.password.strip()
+        api_key = self._get_api_key_from_extra(connection.extra_dejson)
+        if api_key:
+            return api_key
+
+        raise OpenLineageAirflowConnectionAuthError(
+            "OpenLineage `airflow_connection_api_key` auth could not find a 
token in connection "
+            f"`{self.conn_id}`. Expected connection password or token in 
connection extra."
+        )
+
+    def _get_api_key_from_extra(self, extra: dict[str, Any]) -> str | None:
+        if self.extra_key:
+            value = extra.get(self.extra_key)
+            return str(value).strip() if value else None
+
+        for key in _DEFAULT_EXTRA_KEYS:
+            value = extra.get(key)
+            if value:
+                return str(value).strip()
+        return None
+
+
+def resolve_airflow_connection_auth(config: dict[str, Any] | None, 
config_conn_id: str | None = None) -> None:
+    """
+    Read the API key from an Airflow connection and put it into the 
OpenLineage config.
+
+    OpenLineage config can contain one transport, a composite transport, or 
composite transports
+    nested inside each other. This function walks through that structure and 
updates every matching
+    ``auth`` block in place.
+
+    This only makes sense for HTTP transports: ``airflow_connection_api_key`` 
is replaced with
+    ``{"type": "api_key", "apiKey": ...}``.
+    """
+    if not isinstance(config, dict):
+        return
+
+    for key, value in config.items():
+        if (
+            key == "auth"
+            and isinstance(value, dict)
+            and value.get("type") == AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE
+        ):
+            provider = AirflowConnectionTokenProvider(value, 
default_conn_id=config_conn_id)
+            config[key] = {"type": "api_key", "apiKey": provider.get_api_key()}
+        elif key == "transports" and isinstance(value, list):
+            for item in value:
+                resolve_airflow_connection_auth(item, 
config_conn_id=config_conn_id)
+        else:
+            resolve_airflow_connection_auth(value, 
config_conn_id=config_conn_id)
diff --git 
a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py 
b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
index 30e18d0b698..a3a252b8cbb 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import datetime
+import json
 import os
 import pathlib
 import uuid
@@ -41,7 +42,7 @@ from openlineage.client.facet_v2 import (
 from airflow import DAG
 from airflow.models.dagrun import DagRun, DagRunState
 from airflow.models.taskinstance import TaskInstance, TaskInstanceState
-from airflow.providers.common.compat.sdk import Stats
+from airflow.providers.common.compat.sdk import BaseHook, Connection, Stats
 from airflow.providers.openlineage.conf import namespace
 from airflow.providers.openlineage.extractors import OperatorLineage
 from airflow.providers.openlineage.plugins.adapter import _PRODUCER, 
OpenLineageAdapter
@@ -50,6 +51,11 @@ from airflow.providers.openlineage.plugins.facets import (
     AirflowDebugRunFacet,
     AirflowStateRunFacet,
 )
+from airflow.providers.openlineage.token_provider import (
+    AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE,
+    OpenLineageAirflowConnectionAuthError,
+    OpenLineageAirflowConnectionConfigError,
+)
 from airflow.providers.openlineage.utils.utils import get_airflow_job_facet
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.utils.task_group import TaskGroup
@@ -102,6 +108,160 @@ def test_create_client_from_config_with_options():
     assert client.transport.url == "http://ol-api:5000";
 
 
[email protected](BaseHook, "get_connection")
+@conf_vars(
+    {
+        ("openlineage", "transport"): '{"type": "http", "url": 
"http://ol-api:5000";,'
+        ' "auth": {"type": "api_key", "apiKey": "api-key"}}'
+    }
+)
+def 
test_create_client_from_config_without_connection_auth_does_not_read_connection(mock_get_connection):
+    client = OpenLineageAdapter().get_or_create_openlineage_client()
+
+    assert client.transport.kind == "http"
+    assert client.transport.url == "http://ol-api:5000";
+    mock_get_connection.assert_not_called()
+
+
+def _connection_auth_transport_config(**auth_config):
+    auth = {
+        "type": AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE,
+        "conn_id": "openlineage_default",
+        **auth_config,
+    }
+    return json.dumps({"type": "http", "url": "http://ol-api:5000";, "auth": 
auth})
+
+
[email protected](BaseHook, "get_connection")
+def 
test_create_client_from_config_with_connection_auth_password(mock_get_connection):
+    mock_get_connection.return_value = Connection(
+        conn_id="openlineage_default", conn_type="http", password="api-key"
+    )
+
+    with conf_vars({("openlineage", "transport"): 
_connection_auth_transport_config()}):
+        client = OpenLineageAdapter().get_or_create_openlineage_client()
+
+    assert client.transport.kind == "http"
+    assert client.transport.url == "http://ol-api:5000";
+    assert client.transport.config.auth.api_key == "api-key"
+
+
[email protected](BaseHook, "get_connection")
+def 
test_create_client_from_config_with_connection_auth_extra(mock_get_connection):
+    mock_get_connection.return_value = Connection(
+        conn_id="openlineage_default",
+        conn_type="http",
+        extra='{"lineage_token": "api-key-from-extra"}',
+    )
+
+    transport_config = 
_connection_auth_transport_config(extra_key="lineage_token")
+    with conf_vars({("openlineage", "transport"): transport_config}):
+        client = OpenLineageAdapter().get_or_create_openlineage_client()
+
+    assert client.transport.kind == "http"
+    assert client.transport.config.auth.api_key == "api-key-from-extra"
+
+
[email protected](BaseHook, "get_connection")
+def 
test_create_client_from_config_with_connection_auth_token_extra(mock_get_connection):
+    mock_get_connection.return_value = Connection(
+        conn_id="openlineage_default",
+        conn_type="http",
+        extra='{"token": "api-key-from-token"}',
+    )
+
+    with conf_vars({("openlineage", "transport"): 
_connection_auth_transport_config()}):
+        client = OpenLineageAdapter().get_or_create_openlineage_client()
+
+    assert client.transport.kind == "http"
+    assert client.transport.config.auth.api_key == "api-key-from-token"
+
+
[email protected](BaseHook, "get_connection")
+def 
test_create_client_from_config_with_connection_auth_missing_secret(mock_get_connection):
+    mock_get_connection.return_value = 
Connection(conn_id="openlineage_default", conn_type="http", extra="{}")
+
+    with conf_vars({("openlineage", "transport"): 
_connection_auth_transport_config()}):
+        with pytest.raises(OpenLineageAirflowConnectionAuthError, match="could 
not find a token"):
+            OpenLineageAdapter().get_or_create_openlineage_client()
+
+
[email protected](BaseHook, "get_connection")
+def 
test_create_client_from_connection_config_with_connection_auth_password(mock_get_connection):
+    mock_get_connection.return_value = Connection(
+        conn_id="openlineage_default",
+        conn_type="http",
+        password="api-key",
+        extra=json.dumps(
+            {
+                "transport": {
+                    "type": "http",
+                    "url": "http://ol-api:5000";,
+                    "auth": {
+                        "type": AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE,
+                    },
+                }
+            }
+        ),
+    )
+
+    with conf_vars({("openlineage", "config_conn_id"): "openlineage_default"}):
+        client = OpenLineageAdapter().get_or_create_openlineage_client()
+
+    assert client.transport.kind == "http"
+    assert client.transport.url == "http://ol-api:5000";
+    assert client.transport.config.auth.api_key == "api-key"
+
+
[email protected](BaseHook, "get_connection")
+def test_create_client_from_connection_transport_config(mock_get_connection):
+    mock_get_connection.return_value = Connection(
+        conn_id="openlineage_default",
+        conn_type="generic",
+        extra='{"transport": {"type": "console"}}',
+    )
+
+    with conf_vars({("openlineage", "config_conn_id"): "openlineage_default"}):
+        client = OpenLineageAdapter().get_or_create_openlineage_client()
+
+    assert client.transport.kind == "console"
+
+
[email protected](BaseHook, "get_connection")
+def 
test_connection_config_takes_precedence_over_transport_config(mock_get_connection):
+    mock_get_connection.return_value = Connection(
+        conn_id="openlineage_default",
+        conn_type="generic",
+        extra='{"transport": {"type": "console"}}',
+    )
+
+    with conf_vars(
+        {
+            ("openlineage", "config_conn_id"): "openlineage_default",
+            ("openlineage", "transport"): '{"type": "http", "url": 
"http://ol-api:5000"}',
+        }
+    ):
+        client = OpenLineageAdapter().get_or_create_openlineage_client()
+
+    assert client.transport.kind == "console"
+
+
[email protected](BaseHook, "get_connection")
+def 
test_connection_config_missing_transport_raises_custom_exception(mock_get_connection):
+    mock_get_connection.return_value = Connection(
+        conn_id="openlineage_default",
+        conn_type="generic",
+        extra='{"url": "http://ol-api:5000"}',
+    )
+
+    with conf_vars({("openlineage", "config_conn_id"): "openlineage_default"}):
+        with pytest.raises(
+            OpenLineageAirflowConnectionConfigError,
+            match="must contain a `transport` JSON object",
+        ):
+            OpenLineageAdapter().get_or_create_openlineage_client()
+
+
 def test_create_client_from_yaml_config():
     current_folder = pathlib.Path(__file__).parent.resolve()
     yaml_config = str((current_folder / "openlineage_configs" / 
"http.yaml").resolve())
diff --git 
a/providers/openlineage/tests/unit/openlineage/test_token_provider.py 
b/providers/openlineage/tests/unit/openlineage/test_token_provider.py
new file mode 100644
index 00000000000..afd49433cb3
--- /dev/null
+++ b/providers/openlineage/tests/unit/openlineage/test_token_provider.py
@@ -0,0 +1,192 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import patch
+
+import pytest
+
+from airflow.providers.common.compat.sdk import BaseHook, Connection
+from airflow.providers.openlineage.token_provider import (
+    AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE,
+    AirflowConnectionConfigProvider,
+    AirflowConnectionTokenProvider,
+    OpenLineageAirflowConnectionAuthError,
+    OpenLineageAirflowConnectionConfigError,
+    resolve_airflow_connection_auth,
+)
+
+
[email protected](BaseHook, "get_connection")
+def test_get_api_key_from_connection_password(mock_get_connection):
+    mock_get_connection.return_value = Connection(
+        conn_id="openlineage_default", conn_type="http", password="api-key"
+    )
+
+    provider = AirflowConnectionTokenProvider({"conn_id": 
"openlineage_default"})
+
+    assert provider.get_api_key() == "api-key"
+
+
[email protected](BaseHook, "get_connection")
+def test_get_api_key_from_default_connection_id(mock_get_connection):
+    mock_get_connection.return_value = Connection(
+        conn_id="openlineage_default", conn_type="http", password="api-key"
+    )
+
+    provider = AirflowConnectionTokenProvider({}, 
default_conn_id="openlineage_default")
+
+    assert provider.get_api_key() == "api-key"
+
+
[email protected](BaseHook, "get_connection")
+def test_get_api_key_from_connection_extra(mock_get_connection):
+    mock_get_connection.return_value = Connection(
+        conn_id="openlineage_default", conn_type="http", extra='{"api_key": 
"api-key-from-extra"}'
+    )
+
+    provider = AirflowConnectionTokenProvider({"conn_id": 
"openlineage_default"})
+
+    assert provider.get_api_key() == "api-key-from-extra"
+
+
+def test_missing_conn_id_raises_custom_exception():
+    with pytest.raises(OpenLineageAirflowConnectionAuthError, match="requires 
a non-empty `conn_id`"):
+        AirflowConnectionTokenProvider({})
+
+
[email protected](BaseHook, "get_connection")
+def test_missing_token_raises_custom_exception(mock_get_connection):
+    mock_get_connection.return_value = 
Connection(conn_id="openlineage_default", conn_type="http")
+
+    provider = AirflowConnectionTokenProvider({"conn_id": 
"openlineage_default"})
+
+    with pytest.raises(OpenLineageAirflowConnectionAuthError, match="could not 
find a token"):
+        provider.get_api_key()
+
+
[email protected](BaseHook, "get_connection")
+def test_resolve_connection_auth_in_composite_transport(mock_get_connection):
+    mock_get_connection.return_value = Connection(
+        conn_id="openlineage_default", conn_type="http", password="api-key"
+    )
+    config = {
+        "transport": {
+            "type": "composite",
+            "transports": [
+                {
+                    "type": "http",
+                    "url": "http://ol-api:5000";,
+                    "auth": {
+                        "type": AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE,
+                        "conn_id": "openlineage_default",
+                    },
+                }
+            ],
+        }
+    }
+
+    resolve_airflow_connection_auth(config)
+
+    assert config["transport"]["transports"][0]["auth"] == {
+        "type": "api_key",
+        "apiKey": "api-key",
+    }
+
+
[email protected](BaseHook, "get_connection")
+def 
test_resolve_connection_auth_in_nested_composite_transport(mock_get_connection):
+    mock_get_connection.return_value = Connection(
+        conn_id="openlineage_default", conn_type="http", password="api-key"
+    )
+    config = {
+        "transport": {
+            "type": "composite",
+            "transports": [
+                {
+                    "type": "http",
+                    "url": "http://ol-api-1:5000";,
+                    "auth": {
+                        "type": AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE,
+                        "conn_id": "openlineage_default",
+                    },
+                },
+                {
+                    "type": "composite",
+                    "transports": [
+                        {
+                            "type": "http",
+                            "url": "http://ol-api-2:5000";,
+                            "auth": {
+                                "type": AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE,
+                                "conn_id": "openlineage_default",
+                            },
+                        },
+                        {"type": "console"},
+                    ],
+                },
+            ],
+        }
+    }
+
+    resolve_airflow_connection_auth(config)
+
+    assert config["transport"]["transports"][0]["auth"] == {
+        "type": "api_key",
+        "apiKey": "api-key",
+    }
+    assert config["transport"]["transports"][1]["transports"][0]["auth"] == {
+        "type": "api_key",
+        "apiKey": "api-key",
+    }
+    assert config["transport"]["transports"][1]["transports"][1] == {"type": 
"console"}
+
+
[email protected](BaseHook, "get_connection")
+def test_get_openlineage_config_from_connection_extra(mock_get_connection):
+    mock_get_connection.return_value = Connection(
+        conn_id="openlineage_default",
+        conn_type="generic",
+        extra='{"transport": {"type": "console"}}',
+    )
+
+    provider = AirflowConnectionConfigProvider("openlineage_default")
+
+    assert provider.get_config() == {"transport": {"type": "console"}}
+
+
+def test_missing_config_conn_id_raises_custom_exception():
+    with pytest.raises(OpenLineageAirflowConnectionConfigError, 
match="requires a non-empty connection ID"):
+        AirflowConnectionConfigProvider("")
+
+
[email protected](BaseHook, "get_connection")
+def test_missing_config_raises_custom_exception(mock_get_connection):
+    mock_get_connection.return_value = Connection(
+        conn_id="openlineage_default",
+        conn_type="generic",
+        extra='{"url": "http://ol-api:5000"}',
+    )
+
+    provider = AirflowConnectionConfigProvider("openlineage_default")
+
+    with pytest.raises(
+        OpenLineageAirflowConnectionConfigError,
+        match="must contain a `transport` JSON object",
+    ):
+        provider.get_config()

Reply via email to