andreahlert commented on code in PR #62867:
URL: https://github.com/apache/airflow/pull/62867#discussion_r2920720023
##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py:
##########
@@ -158,6 +158,58 @@ def _fetch_extra_configs(keys: list[str]) -> dict[str,
Any]:
credentials = self._remove_none_values(credentials)
extra_config = _fetch_extra_configs(["region", "endpoint"])
+ case "google_cloud_platform":
+ try:
+ from airflow.providers.google.common.hooks.base_google
import get_field as gcp_get_field
+ except ImportError:
+ from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
+
+ raise AirflowOptionalProviderFeatureException(
+ "Failed to import get_field. To use the GCS storage
functionality, please install the "
+ "apache-airflow-providers-google package."
+ )
+ key_path = gcp_get_field(conn.extra_dejson, "key_path")
+ if key_path:
+ credentials.update({"service_account_path": key_path})
+ # Without key_path, credentials stays empty and DataFusion
falls back to ADC.
+ extra_config = {}
+
+ case "wasb":
+ try:
+ # Imported as a feature gate only: verifies the Azure
provider is installed
+ from airflow.providers.microsoft.azure.hooks.wasb import
WasbHook # noqa: F401
+ except ImportError:
+ from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
+
+ raise AirflowOptionalProviderFeatureException(
+ "Failed to import WasbHook. To use the Azure storage
functionality, please install the "
+ "apache-airflow-providers-microsoft-azure package."
+ )
+ tenant_id = conn.extra_dejson.get("tenant_id")
+ if tenant_id:
+ # Service Principal auth: conn.host holds the storage
account (name or full URL);
+ # conn.login is the client_id (AAD app ID), matching
WasbHook convention.
+ # DataFusion requires just the account name, so strip any
URL components.
+ from urllib.parse import urlparse
+
+ host = conn.host or ""
+ parsed = urlparse(host if "://" in host else
f"https://{host}")
+ account = parsed.netloc.split(".")[0] if "." in
(parsed.netloc or "") else host
+ credentials = {
+ "account": account or None,
+ "client_id": conn.extra_dejson.get("client_id") or
conn.login,
+ "client_secret":
conn.extra_dejson.get("client_secret") or conn.password,
+ "tenant_id": tenant_id,
+ }
+ else:
+ # Key auth: conn.login = storage account name
+ credentials = {
+ "account": conn.login,
+ "access_key": conn.password or
conn.extra_dejson.get("account_key"),
+ }
+ credentials = self._remove_none_values(credentials)
+ extra_config = {}
Review Comment:
If both `conn.login` and `conn.password`/`account_key` are absent,
`_remove_none_values` returns an empty dict and DataFusion fails with a cryptic
internal error. Worth raising a `ValueError` early pointing to the missing
connection fields, otherwise the user has no idea where to look.
##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py:
##########
@@ -158,6 +158,58 @@ def _fetch_extra_configs(keys: list[str]) -> dict[str,
Any]:
credentials = self._remove_none_values(credentials)
extra_config = _fetch_extra_configs(["region", "endpoint"])
+ case "google_cloud_platform":
+ try:
+ from airflow.providers.google.common.hooks.base_google
import get_field as gcp_get_field
+ except ImportError:
+ from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
+
+ raise AirflowOptionalProviderFeatureException(
+ "Failed to import get_field. To use the GCS storage
functionality, please install the "
+ "apache-airflow-providers-google package."
+ )
+ key_path = gcp_get_field(conn.extra_dejson, "key_path")
+ if key_path:
+ credentials.update({"service_account_path": key_path})
+ # Without key_path, credentials stays empty and DataFusion
falls back to ADC.
+ extra_config = {}
+
+ case "wasb":
+ try:
+ # Imported as a feature gate only: verifies the Azure
provider is installed
+ from airflow.providers.microsoft.azure.hooks.wasb import
WasbHook # noqa: F401
+ except ImportError:
+ from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
+
+ raise AirflowOptionalProviderFeatureException(
+ "Failed to import WasbHook. To use the Azure storage
functionality, please install the "
+ "apache-airflow-providers-microsoft-azure package."
+ )
+ tenant_id = conn.extra_dejson.get("tenant_id")
+ if tenant_id:
+ # Service Principal auth: conn.host holds the storage
account (name or full URL);
+ # conn.login is the client_id (AAD app ID), matching
WasbHook convention.
+ # DataFusion requires just the account name, so strip any
URL components.
+ from urllib.parse import urlparse
+
+ host = conn.host or ""
+ parsed = urlparse(host if "://" in host else
f"https://{host}")
+ account = parsed.netloc.split(".")[0] if "." in
(parsed.netloc or "") else host
+ credentials = {
Review Comment:
The `":/" in host` branch means you explicitly handle URLs with schemes. But
if `conn.host` is `"https://mystorageaccount"` (scheme present, no subdomain),
`netloc` becomes `mystorageaccount`, the `". " in netloc` check fails, and
`account` falls back to the raw `host` string, scheme included. The fallback
should be `parsed.netloc or host`, not `host`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]