vvijay-ad opened a new issue, #63251: URL: https://github.com/apache/airflow/issues/63251
### Apache Airflow Provider(s) openlineage ### Versions of Apache Airflow Providers apache-airflow-providers-openlineage | 2.8.0 | OpenLineage [apache-airflow-providers-openlineage](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/2.8.0) 2.8.0 [OpenLineage](https://openlineage.io/) ### Apache Airflow version 2.10.5 ### Operating System ProductName: macOS ProductVersion: 15.7.3 BuildVersion: 24G419 ### Deployment Google Cloud Composer ### Deployment details Image version: composer-3-airflow-2.10.5-build.25 ### What happened In Cloud Composer 3 with the OpenLineage Airflow provider, a custom `TokenProvider` implemented in an installed PyPI package is importable by DAG code but consistently fails to import when referenced via `openlineage.transport.auth.type` in the scheduler’s OpenLineage plugin. This prevents emitting DAG-level OpenLineage events from the Composer 3 scheduler. *** ### Environment - **Platform:** Google Cloud Composer 3 (GKE Autopilot, Google‑managed) - **Airflow:** Composer 3 default Airflow 2.10.5 - **Lineage integration:** `apache-airflow-providers-openlineage` (>=2.8.0) + OpenLineage Python client - **Custom package (PyPI):** `ad-openlineage-token-provider==1.0.x` - **Composer 3 config:** - Package installed via Composer PyPI dependencies (build logs show successful install in Worker \& Scheduler image) - OpenLineage configured through Airflow configuration override (`openlineage.transport`) *** ### Custom TokenProvider package Published to PyPI as `ad-openlineage-token-provider`, with wheel contents: ```text tokenproviders/__init__.py tokenproviders/access_key_secret_token_provider.py ad_openlineage_token_provider-1.0.x.dist-info/... ``` Implementation: ```python # tokenproviders/access_key_secret_token_provider.py from openlineage.client.transport.http import TokenProvider import logging log = logging.getLogger(__name__) class AccessKeySecretKeyTokenProvider(TokenProvider): def __init__(self, config: dict[str, str]) -> None: super().__init__(config) log.info("AccessKeySecretKeyTokenProvider constructor invoked.") def get_bearer(self): # get the bearer based on the access_key and secret_key config ... ... ``` The Python import path is: ```python from tokenproviders.access_key_secret_token_provider import AccessKeySecretKeyTokenProvider ``` *** ### OpenLineage transport configuration in Airflow Shown in Airflow UI → Admin → Configurations: ```json { "type": "http", "url": "https://myopenlineage-backend.mydomain.com", "endpoint": "/api/v1/lineage", "auth": { "type": "tokenproviders.access_key_secret_token_provider.AccessKeySecretKeyTokenProvider", "access_key": "xxxxxx", "secret_key": "xxxxxx" } } ``` This matches the working Python path in the package. *** ### Observed behavior 1. **Package installation confirmed for scheduler image** Composer 3 Worker \& Scheduler image build logs show: ```text Downloading ad_openlineage_token_provider-1.0.x-py3-none-any.whl ... Installing collected packages: ad-openlineage-token-provider Successfully installed ad-openlineage-token-provider-1.0.x ``` 2. **Scheduler can import the package in DAG code** A test DAG: ```python # dags/test_tokenproviders_import.py from datetime import datetime from airflow import DAG from tokenproviders.access_key_secret_token_provider import AccessKeySecretKeyTokenProvider with DAG( dag_id="test_tokenproviders_import", start_date=datetime(2024, 1, 1), schedule=None, catchup=False, ) as dag: pass ``` - Appears as **healthy** in the Airflow UI. - Runs successfully (no import errors in scheduler logs for this DAG itself). 3. **Scheduler OpenLineage plugin fails to import the same class** When DAG-level OpenLineage events are emitted, the scheduler logs: ```text WARNING No module named 'tokenproviders' WARNING Failed to emit OpenLineage DAG started/success event: ... ModuleNotFoundError: No module named 'tokenproviders' ... ImportError: Failed to import tokenproviders.access_key_secret_token_provider.AccessKeySecretKeyTokenProvider ``` Stack trace shows failure in: - `openlineage.client.utils.import_from_string` - called from `openlineage.client.transport.http.create_token_provider` - triggered by `airflow.providers.openlineage.plugins.adapter` during DAG started/success callbacks. *** ### Hypothesis - In Composer 3, the scheduler’s **DAG parsing** environment and the **OpenLineage plugin** environment are not sharing the same `sys.path` / site-packages view. - The installed package `ad-openlineage-token-provider` (exposing `tokenproviders`) is visible to DAG imports but **not** visible to the Python interpreter/context used by `airflow.providers.openlineage.plugins.adapter` when calling `import_from_string(auth.type)`. - Because Composer 3 hides scheduler shell access and does not synchronize custom plugins to scheduler/triggerer, there is no supported user-level way to adjust or inspect the plugin’s `PYTHONPATH`. *** ### Expected vs actual - **Expected:** Any Python package installed into the Worker \& Scheduler image via Composer’s PyPI dependencies is importable from: - DAG code during scheduler parsing and execution, and - Scheduler-resident plugins (like the OpenLineage provider) when they import classes by fully-qualified module path. - **Actual:** - The package is importable in DAG code (`from tokenproviders...` works). - The exact same module path used in `openlineage.transport.auth.type` fails with `ModuleNotFoundError` when OpenLineage’s scheduler plugin tries to import it. *** ### Impact - Custom OpenLineage token providers implemented as installed Python packages cannot be used in Cloud Composer 3’s scheduler. - DAG-level OpenLineage events (started/success/failed) cannot authenticate to the lineage backend using a custom TokenProvider, even though the same code works from workers and in local tests. *** ### Request 1. **Confirm** whether this is a known limitation or bug in: - Cloud Composer 3’s scheduler/plugin environment, or - The OpenLineage Airflow provider’s integration on Composer 3. 2. **Clarify expected behavior**: - Should a package installed via Composer 3 PyPI dependencies be importable by the OpenLineage scheduler plugin using a dotted path in `auth.type`? 3. **Provide a fix or guidance** so that: - Installed Python packages (like `ad-openlineage-token-provider`) are visible in the Composer 3 scheduler OpenLineage plugin context, and - A custom `TokenProvider` can be referenced via `openlineage.transport.auth.type` as documented (e.g. `"mymodule.MyTokenProvider"` style paths). ### What you think should happen instead - The package is importable in DAG code (`from tokenproviders...` works). - The exact same module path used in `openlineage.transport.auth.type` fails with `ModuleNotFoundError` when OpenLineage’s scheduler plugin tries to import it. - The celery workers are able to import the tokenprovider.* module and emitting Task-level OpenLineage events successfully. ### How to reproduce 1. **Create a minimal custom TokenProvider package** Project structure: ```text my_openlineage_token_provider/ mymodule/ __init__.py my_token_provider.py pyproject.toml or setup.py ``` `mymodule/my_token_provider.py`: ```python from openlineage.client.transport.http import TokenProvider import logging log = logging.getLogger(__name__) class MyTokenProvider(TokenProvider): def __init__(self, config: dict[str, str]) -> None: super().__init__(config) log.info("MyTokenProvider constructed with config keys: %s", list(config.keys())) ``` Build and publish this to PyPI as `my-openlineage-token-provider`, so that installing it exposes a top‑level package `mymodule` with `mymodule.my_token_provider.MyTokenProvider`. 2. **Install the package in Cloud Composer 3** In Composer 3: - Add `my-openlineage-token-provider==1.0.0` to the environment’s PyPI dependencies (UI or `gcloud composer environments update --update-pypi-package="my-openlineage-token-provider==1.0.0"`). - Wait for the Worker \& Scheduler image build to complete; verify build logs show: ```text Downloading my_openlineage_token_provider-1.0.0-py3-none-any.whl ... Installing collected packages: my-openlineage-token-provider Successfully installed my-openlineage-token-provider-1.0.0 ``` 3. **Configure OpenLineage to use the custom TokenProvider** In Airflow configuration overrides for the Composer 3 environment, set: - Section: `openlineage` - Key: `transport` - Value (JSON): ```json { "type": "http", "url": "https://myopenlineage-backend.mydomain.com", "endpoint": "/api/v1/lineage", "auth": { "type": "mymodule.my_token_provider.MyTokenProvider", "access_key": "xxxxxx", "secret_key": "xxxxxxxx" } } ``` Confirm in Airflow UI → Admin → Configurations that this value is visible under `openlineage.transport`. 4. **Create a test DAG that imports the provider** Deploy a simple DAG to the Composer 3 environment: ```python # dags/test_my_token_provider_import.py from datetime import datetime from airflow import DAG from mymodule.my_token_provider import MyTokenProvider with DAG( dag_id="test_my_token_provider_import", start_date=datetime(2024, 1, 1), schedule=None, catchup=False, ) as dag: pass ``` - Verify that the DAG appears as healthy in the Airflow UI. - Trigger it manually and confirm there are **no import errors** for this DAG in scheduler logs. 5. **Trigger any DAG and observe scheduler logs for OpenLineage** - Trigger `test_my_token_provider_import` or another simple DAG. - Check scheduler logs (Composer → Logs → airflow-scheduler) for OpenLineage events. ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
