mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2770573516


##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##########
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
         return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+    def _get_k8s_jwt_token(self) -> str:
+        """
+        Get JWT token from Kubernetes.
+
+        Supports two methods:
+        1. Projected volume: reads token directly from configured path
+        2. TokenRequest API: dynamically requests token from K8s API
+
+        :return: JWT Service Account token string
+        """
+        if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+            self.log.info("Using Kubernetes projected volume token")
+            return self._get_k8s_projected_volume_token()
+
+        self.log.info("Using Kubernetes TokenRequest API")
+        return self._get_k8s_token_request_api()
+
+    async def _a_get_k8s_jwt_token(self) -> str:
+        """Async version of _get_k8s_jwt_token()."""
+        if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+            self.log.info("Using Kubernetes projected volume token")
+            return await self._a_get_k8s_projected_volume_token()
+
+        self.log.info("Using Kubernetes TokenRequest API")
+        return await self._a_get_k8s_token_request_api()
+
+    def _get_k8s_projected_volume_token(self) -> str:
+        """
+        Get JWT token from Kubernetes projected volume.
+
+        Reads a pre-configured service account token from a projected volume.
+        The token should be configured in the Pod spec with the desired 
audience
+        and expiration settings.
+
+        :return: JWT Service Account token string
+        """
+        projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+            "k8s_projected_volume_token_path"
+        )
+
+        if not projected_token_path:
+            raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+        try:
+            with open(projected_token_path) as f:
+                token = f.read().strip()
+
+            if not token:
+                raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+            self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+            return token
+        except FileNotFoundError as e:
+            raise AirflowException(
+                f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+                "Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+            ) from e
+        except PermissionError as e:
+            raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+    async def _a_get_k8s_projected_volume_token(self) -> str:
+        """Async version of _get_k8s_projected_volume_token()."""
+        projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+            "k8s_projected_volume_token_path"
+        )
+
+        if not projected_token_path:
+            raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+        try:
+            async with aiofiles.open(projected_token_path) as f:
+                token = (await f.read()).strip()
+
+            if not token:
+                raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+            self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+            return token
+        except FileNotFoundError as e:
+            raise AirflowException(
+                f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+                "Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+            ) from e
+        except PermissionError as e:
+            raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+    def _get_k8s_token_request_api(self) -> str:
+        """
+        Get JWT token using Kubernetes TokenRequest API.
+
+        Dynamically requests a service account token from the Kubernetes API 
server
+        with custom audience and expiration settings.
+
+        :return: JWT Service Account token string
+        """
+        audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+        expiration_seconds = 
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+        token_path = self.databricks_conn.extra_dejson.get(
+            "k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+        )
+        namespace_path = self.databricks_conn.extra_dejson.get(
+            "k8s_namespace_path", DEFAULT_K8S_NAMESPACE_PATH
+        )
+
+        try:
+            with open(token_path) as f:
+                in_cluster_token = f.read().strip()
+
+            with open(namespace_path) as f:
+                namespace = f.read().strip()
+
+            # Call Kubernetes TokenRequest API with the in-cluster token
+            token_request_url = (
+                
f"{K8S_TOKEN_SERVICE_URL}/api/v1/namespaces/{namespace}/serviceaccounts/default/token"
+            )
+
+            for attempt in self._get_retry_object():
+                with attempt:
+                    resp = requests.post(
+                        token_request_url,
+                        headers={
+                            "Authorization": f"Bearer {in_cluster_token}",
+                            "Content-Type": "application/json",
+                        },
+                        json={
+                            "apiVersion": "authentication.k8s.io/v1",
+                            "kind": "TokenRequest",
+                            "spec": {
+                                "audiences": [audience],
+                                "expirationSeconds": expiration_seconds,
+                            },
+                        },
+                        verify=False,  # K8s in-cluster uses self-signed certs
+                        timeout=self.token_timeout_seconds,
+                    )
+                    resp.raise_for_status()
+                    return resp.json()["status"]["token"]
+        except FileNotFoundError as e:
+            raise AirflowException(
+                "Kubernetes service account token not found. "
+                "This authentication method only works when running inside a 
Kubernetes cluster."
+            ) from e
+        except RetryError:
+            raise AirflowException(
+                f"Failed to get Kubernetes JWT token after {self.retry_limit} 
retries. Giving up."
+            )

Review Comment:
   The tenacity retry mechanism will raise `RetryError` when a retryable 
exception occurs and retry attempts are exhausted. Most transient errors that 
will try to retry will resolve within the retry limit. So this is this is 
reachable, but will be rare in practice. Again following existing pattern in 
this hook.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to