alexott commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2768823315
##########
providers/databricks/docs/connections/databricks.rst:
##########
@@ -91,6 +100,244 @@ Extra (optional)
* ``azure_resource_id``: optional Resource ID of the Azure Databricks
workspace (required if managed identity isn't
a user inside workspace)
+ The following parameters are necessary if using authentication with
Kubernetes OIDC token federation:
+
+ * ``federated_k8s``: set ``login`` to ``"federated_k8s"`` or add this as
extra parameter. When enabled, the hook will fetch a JWT token from Kubernetes
and exchange it for a Databricks OAuth token using the `OIDC token exchange API
<https://docs.databricks.com/aws/en/dev-tools/auth/oauth-federation-exchange.html>`_.
This authentication method only works when Airflow is running inside a
Kubernetes cluster (e.g., AWS EKS, Azure AKS, Google GKE).
+
+ **Two methods are supported for obtaining the Kubernetes JWT token:**
+
+ **Method 1: Projected Volume**
+
+ * ``k8s_projected_volume_token_path``: (optional) path to a [Kubernetes
projected volume service account
token](https://kubernetes.io/docs/concepts/configuration/secret/#projected-volume).
When configured, the hook will read the token directly from this file. The
token must be configured in your Pod spec with the appropriate audience and
expiration. This is the recommended method as it's simpler and more efficient
(no API calls). See the example Pod configuration below.
Review Comment:
This is `.rst` file, so links are done differently than in Markdown
##########
providers/databricks/docs/connections/databricks.rst:
##########
@@ -41,6 +41,13 @@ There are several ways to connect to Databricks using
Airflow.
`user inside workspace
<https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/latest/aad/service-prin-aad-token#--api-access-for-service-principals-that-are-azure-databricks-workspace-users-and-admins>`_,
or `outside of workspace having Owner or Contributor permissions
<https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/latest/aad/service-prin-aad-token#--api-access-for-service-principals-that-are-not-workspace-users>`_
4. Using Azure Active Directory (AAD) token obtained for `Azure managed
identity
<https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token>`_,
when Airflow runs on the VM with assigned managed identity (system-assigned
or user-assigned)
+5. Using Databricks-managed Service Principal OAuth
Review Comment:
I maybe would add something like `(available on all supported clouds)`
##########
providers/databricks/docs/connections/databricks.rst:
##########
@@ -91,6 +100,244 @@ Extra (optional)
* ``azure_resource_id``: optional Resource ID of the Azure Databricks
workspace (required if managed identity isn't
a user inside workspace)
+ The following parameters are necessary if using authentication with
Kubernetes OIDC token federation:
+
+ * ``federated_k8s``: set ``login`` to ``"federated_k8s"`` or add this as
extra parameter. When enabled, the hook will fetch a JWT token from Kubernetes
and exchange it for a Databricks OAuth token using the `OIDC token exchange API
<https://docs.databricks.com/aws/en/dev-tools/auth/oauth-federation-exchange.html>`_.
This authentication method only works when Airflow is running inside a
Kubernetes cluster (e.g., AWS EKS, Azure AKS, Google GKE).
Review Comment:
Is it a boolean flag in `extra`?
##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##########
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict,
time_key="expires_on") -> bool:
return int(token[time_key]) > (int(time.time()) +
TOKEN_REFRESH_LEAD_TIME)
+ def _get_k8s_jwt_token(self) -> str:
+ """
+ Get JWT token from Kubernetes.
+
+ Supports two methods:
+ 1. Projected volume: reads token directly from configured path
+ 2. TokenRequest API: dynamically requests token from K8s API
+
+ :return: JWT Service Account token string
+ """
+ if "k8s_projected_volume_token_path" in
self.databricks_conn.extra_dejson:
+ self.log.info("Using Kubernetes projected volume token")
+ return self._get_k8s_projected_volume_token()
+
+ self.log.info("Using Kubernetes TokenRequest API")
+ return self._get_k8s_token_request_api()
+
+ async def _a_get_k8s_jwt_token(self) -> str:
+ """Async version of _get_k8s_jwt_token()."""
+ if "k8s_projected_volume_token_path" in
self.databricks_conn.extra_dejson:
+ self.log.info("Using Kubernetes projected volume token")
+ return await self._a_get_k8s_projected_volume_token()
+
+ self.log.info("Using Kubernetes TokenRequest API")
+ return await self._a_get_k8s_token_request_api()
+
+ def _get_k8s_projected_volume_token(self) -> str:
+ """
+ Get JWT token from Kubernetes projected volume.
+
+ Reads a pre-configured service account token from a projected volume.
+ The token should be configured in the Pod spec with the desired
audience
+ and expiration settings.
+
+ :return: JWT Service Account token string
+ """
+ projected_token_path: str | None =
self.databricks_conn.extra_dejson.get(
+ "k8s_projected_volume_token_path"
+ )
+
+ if not projected_token_path:
+ raise AirflowException("k8s_projected_volume_token_path is not
configured in connection extras")
+
+ try:
+ with open(projected_token_path) as f:
+ token = f.read().strip()
+
+ if not token:
+ raise AirflowException(f"Token file at {projected_token_path}
is empty")
+
+ self.log.debug("Successfully read token from projected volume at
%s", projected_token_path)
+ return token
+ except FileNotFoundError as e:
+ raise AirflowException(
+ f"Kubernetes projected volume token not found at
{projected_token_path}. "
+ "Ensure your Pod has a projected volume configured with
serviceAccountToken."
+ ) from e
+ except PermissionError as e:
+ raise AirflowException(f"Permission denied reading token from
{projected_token_path}") from e
+
+ async def _a_get_k8s_projected_volume_token(self) -> str:
+ """Async version of _get_k8s_projected_volume_token()."""
+ projected_token_path: str | None =
self.databricks_conn.extra_dejson.get(
+ "k8s_projected_volume_token_path"
+ )
+
+ if not projected_token_path:
+ raise AirflowException("k8s_projected_volume_token_path is not
configured in connection extras")
+
+ try:
+ async with aiofiles.open(projected_token_path) as f:
+ token = (await f.read()).strip()
+
+ if not token:
+ raise AirflowException(f"Token file at {projected_token_path}
is empty")
+
+ self.log.debug("Successfully read token from projected volume at
%s", projected_token_path)
+ return token
+ except FileNotFoundError as e:
+ raise AirflowException(
+ f"Kubernetes projected volume token not found at
{projected_token_path}. "
+ "Ensure your Pod has a projected volume configured with
serviceAccountToken."
+ ) from e
+ except PermissionError as e:
+ raise AirflowException(f"Permission denied reading token from
{projected_token_path}") from e
+
+ def _get_k8s_token_request_api(self) -> str:
+ """
+ Get JWT token using Kubernetes TokenRequest API.
+
+ Dynamically requests a service account token from the Kubernetes API
server
+ with custom audience and expiration settings.
+
+ :return: JWT Service Account token string
+ """
+ audience = self.databricks_conn.extra_dejson.get("audience",
DEFAULT_K8S_AUDIENCE)
+ expiration_seconds =
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+ token_path = self.databricks_conn.extra_dejson.get(
+ "k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+ )
+ namespace_path = self.databricks_conn.extra_dejson.get(
+ "k8s_namespace_path", DEFAULT_K8S_NAMESPACE_PATH
+ )
+
+ try:
+ with open(token_path) as f:
+ in_cluster_token = f.read().strip()
+
+ with open(namespace_path) as f:
+ namespace = f.read().strip()
+
+ # Call Kubernetes TokenRequest API with the in-cluster token
+ token_request_url = (
+
f"{K8S_TOKEN_SERVICE_URL}/api/v1/namespaces/{namespace}/serviceaccounts/default/token"
+ )
+
+ for attempt in self._get_retry_object():
+ with attempt:
+ resp = requests.post(
+ token_request_url,
+ headers={
+ "Authorization": f"Bearer {in_cluster_token}",
+ "Content-Type": "application/json",
+ },
+ json={
+ "apiVersion": "authentication.k8s.io/v1",
+ "kind": "TokenRequest",
+ "spec": {
+ "audiences": [audience],
+ "expirationSeconds": expiration_seconds,
+ },
Review Comment:
Maybe wrap this into the function? Just to avoid code duplciation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]