andreahlert commented on code in PR #62867:
URL: https://github.com/apache/airflow/pull/62867#discussion_r2907232754
##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py:
##########
@@ -158,6 +158,49 @@ def _fetch_extra_configs(keys: list[str]) -> dict[str,
Any]:
credentials = self._remove_none_values(credentials)
extra_config = _fetch_extra_configs(["region", "endpoint"])
+ case "google_cloud_platform":
+ try:
+ from airflow.providers.google.common.hooks.base_google
import GoogleBaseHook
+ except ImportError:
+ from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
+
+ raise AirflowOptionalProviderFeatureException(
+ "Failed to import GoogleBaseHook. To use the GCS
storage functionality, please install the "
+ "apache-airflow-providers-google package."
+ )
Review Comment:
`_get_field` is a private method and could break without notice. Is there a
public API to get the key path?
##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py:
##########
@@ -158,6 +158,49 @@ def _fetch_extra_configs(keys: list[str]) -> dict[str,
Any]:
credentials = self._remove_none_values(credentials)
extra_config = _fetch_extra_configs(["region", "endpoint"])
+ case "google_cloud_platform":
+ try:
+ from airflow.providers.google.common.hooks.base_google
import GoogleBaseHook
+ except ImportError:
+ from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
+
+ raise AirflowOptionalProviderFeatureException(
+ "Failed to import GoogleBaseHook. To use the GCS
storage functionality, please install the "
+ "apache-airflow-providers-google package."
+ )
+ gcp_hook: GoogleBaseHook =
GoogleBaseHook(gcp_conn_id=conn.conn_id)
+ key_path = gcp_hook._get_field("key_path")
+ if key_path:
+ credentials.update({"service_account_path": key_path})
+ extra_config = {}
+
+ case "wasb":
+ try:
Review Comment:
`WasbHook` is imported but never actually used. If it's only here as a
feature gate (to check the package is installed), a short comment explaining
that would help. Otherwise the `noqa: F401` can go.
##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/object_storage_provider.py:
##########
@@ -34,7 +34,7 @@ def get_storage_type(self) -> StorageType:
def create_object_store(self, path: str, connection_config:
ConnectionConfig | None = None):
"""Create an S3 object store using DataFusion's AmazonS3."""
if connection_config is None:
- raise ValueError("connection_config must be provided for %s",
self.get_storage_type)
+ raise ValueError(f"connection_config must be provided for
{self.get_storage_type}")
Review Comment:
Good catch on the format bug, but since this is unrelated to the GCS/Azure
feature it'd be cleaner as a separate commit (or at least called out in the PR
description).
##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py:
##########
@@ -158,6 +158,49 @@ def _fetch_extra_configs(keys: list[str]) -> dict[str,
Any]:
credentials = self._remove_none_values(credentials)
extra_config = _fetch_extra_configs(["region", "endpoint"])
+ case "google_cloud_platform":
+ try:
+ from airflow.providers.google.common.hooks.base_google
import GoogleBaseHook
+ except ImportError:
+ from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
+
+ raise AirflowOptionalProviderFeatureException(
+ "Failed to import GoogleBaseHook. To use the GCS
storage functionality, please install the "
Review Comment:
If `key_path` is None, credentials end up as an empty dict. Does
`GoogleCloud()` fall back to ADC automatically, or will it just error out?
Worth a comment here.
##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py:
##########
@@ -158,6 +158,49 @@ def _fetch_extra_configs(keys: list[str]) -> dict[str,
Any]:
credentials = self._remove_none_values(credentials)
extra_config = _fetch_extra_configs(["region", "endpoint"])
+ case "google_cloud_platform":
+ try:
+ from airflow.providers.google.common.hooks.base_google
import GoogleBaseHook
+ except ImportError:
+ from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
+
+ raise AirflowOptionalProviderFeatureException(
+ "Failed to import GoogleBaseHook. To use the GCS
storage functionality, please install the "
+ "apache-airflow-providers-google package."
+ )
+ gcp_hook: GoogleBaseHook =
GoogleBaseHook(gcp_conn_id=conn.conn_id)
+ key_path = gcp_hook._get_field("key_path")
+ if key_path:
+ credentials.update({"service_account_path": key_path})
+ extra_config = {}
+
+ case "wasb":
+ try:
+ from airflow.providers.microsoft.azure.hooks.wasb import
WasbHook # noqa: F401
+ except ImportError:
+ from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
+
+ raise AirflowOptionalProviderFeatureException(
+ "Failed to import WasbHook. To use the Azure storage
functionality, please install the "
+ "apache-airflow-providers-microsoft-azure package."
+ )
+ account_name = conn.host or conn.login
+ tenant_id = conn.extra_dejson.get("tenant_id")
+ if tenant_id:
Review Comment:
`WasbHook` typically resolves the account name from `conn.login`. Using
`conn.host` as first pick diverges from the standard behavior. Is this
intentional?
##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py:
##########
@@ -158,6 +158,49 @@ def _fetch_extra_configs(keys: list[str]) -> dict[str,
Any]:
credentials = self._remove_none_values(credentials)
extra_config = _fetch_extra_configs(["region", "endpoint"])
+ case "google_cloud_platform":
+ try:
+ from airflow.providers.google.common.hooks.base_google
import GoogleBaseHook
+ except ImportError:
+ from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
+
+ raise AirflowOptionalProviderFeatureException(
+ "Failed to import GoogleBaseHook. To use the GCS
storage functionality, please install the "
+ "apache-airflow-providers-google package."
+ )
+ gcp_hook: GoogleBaseHook =
GoogleBaseHook(gcp_conn_id=conn.conn_id)
+ key_path = gcp_hook._get_field("key_path")
+ if key_path:
+ credentials.update({"service_account_path": key_path})
+ extra_config = {}
+
+ case "wasb":
+ try:
+ from airflow.providers.microsoft.azure.hooks.wasb import
WasbHook # noqa: F401
+ except ImportError:
+ from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
+
+ raise AirflowOptionalProviderFeatureException(
+ "Failed to import WasbHook. To use the Azure storage
functionality, please install the "
+ "apache-airflow-providers-microsoft-azure package."
+ )
+ account_name = conn.host or conn.login
+ tenant_id = conn.extra_dejson.get("tenant_id")
+ if tenant_id:
+ credentials = {
+ "account": account_name,
+ "client_id": conn.extra_dejson.get("client_id") or
conn.login,
+ "client_secret":
conn.extra_dejson.get("client_secret") or conn.password,
+ "tenant_id": tenant_id,
+ }
Review Comment:
`conn.password` doubles as `access_key` in the other branch and as
`client_secret` fallback here. Could get ambiguous depending on how the
connection fields are populated.
##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py:
##########
@@ -158,6 +158,49 @@ def _fetch_extra_configs(keys: list[str]) -> dict[str,
Any]:
credentials = self._remove_none_values(credentials)
extra_config = _fetch_extra_configs(["region", "endpoint"])
+ case "google_cloud_platform":
+ try:
+ from airflow.providers.google.common.hooks.base_google
import GoogleBaseHook
+ except ImportError:
+ from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
Review Comment:
nit: only `service_account_path` is supported for GCP auth. Any plans to
support keyfile JSON content or other methods?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]