Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-17 Thread via GitHub


boring-cyborg[bot] commented on PR #61458:
URL: https://github.com/apache/airflow/pull/61458#issuecomment-3915035900

   Awesome work, congrats on your first merged pull request! You are invited to 
check our [Issue Tracker](https://github.com/apache/airflow/issues) for 
additional contributions.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-17 Thread via GitHub


potiuk merged PR #61458:
URL: https://github.com/apache/airflow/pull/61458


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-17 Thread via GitHub


alexott commented on PR #61458:
URL: https://github.com/apache/airflow/pull/61458#issuecomment-3914983178

   @potiuk, we finished internal and customer testing, so this PR is ready for 
additional review/merge.
   
   thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-16 Thread via GitHub


mwojtyczka commented on PR #61458:
URL: https://github.com/apache/airflow/pull/61458#issuecomment-3907774346

   @potiuk thanks for looking into this. Fixed the doc now. 
   There was one mypy error in mssql to hive provider - not related to 
Databricks provider changes. I updated the branch so hopefully this should be 
gone now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-15 Thread via GitHub


potiuk commented on PR #61458:
URL: https://github.com/apache/airflow/pull/61458#issuecomment-3905409640

   Ah I see we have more people from Databricks here :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-15 Thread via GitHub


potiuk commented on PR #61458:
URL: https://github.com/apache/airflow/pull/61458#issuecomment-3905406005

   @sryza - maybe you can take a look ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-13 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2803507311


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +544,360 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str = 
self.databricks_conn.extra_dejson["k8s_projected_volume_token_path"]
+
+try:
+with open(projected_token_path) as f:
+token = f.read().strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+async def _a_get_k8s_projected_volume_token(self) -> str:
+"""Async version of _get_k8s_projected_volume_token()."""
+projected_token_path: str = 
self.databricks_conn.extra_dejson["k8s_projected_volume_token_path"]
+
+try:
+async with aiofiles.open(projected_token_path) as f:
+token = (await f.read()).strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+@staticmethod
+def _build_k8s_token_request_payload(audience: str, expiration_seconds: 
int) -> dict[str, Any]:
+"""
+Build the JSON payload for Kubernetes TokenRequest API.
+
+:param audience: The audience value for the JWT token
+:param expiration_seconds: Token expiration in seconds
+:return: TokenRequest API payload dictionary
+"""
+return {
+"apiVersion": "authentication.k8s.io/v1",
+"kind": "TokenRequest",
+"spec": {
+"audiences": [audience],
+"expirationSeconds": expiration_seconds,
+},
+}
+
+def _get_k8s_token_request_api(self) -> str:
+"""
+Get JWT token using Kubernetes TokenRequest API.
+
+Dynamically requests a service account token from the Kubernetes API 
server
+with custom audience and expiration settings.
+
+:return: JWT Service Account token string
+"""
+audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+expiration_seconds = 
self.databricks_c

Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-13 Thread via GitHub


alexott commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2803037213


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +544,360 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str = 
self.databricks_conn.extra_dejson["k8s_projected_volume_token_path"]
+
+try:
+with open(projected_token_path) as f:
+token = f.read().strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+async def _a_get_k8s_projected_volume_token(self) -> str:
+"""Async version of _get_k8s_projected_volume_token()."""
+projected_token_path: str = 
self.databricks_conn.extra_dejson["k8s_projected_volume_token_path"]
+
+try:
+async with aiofiles.open(projected_token_path) as f:
+token = (await f.read()).strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+@staticmethod
+def _build_k8s_token_request_payload(audience: str, expiration_seconds: 
int) -> dict[str, Any]:
+"""
+Build the JSON payload for Kubernetes TokenRequest API.
+
+:param audience: The audience value for the JWT token
+:param expiration_seconds: Token expiration in seconds
+:return: TokenRequest API payload dictionary
+"""
+return {
+"apiVersion": "authentication.k8s.io/v1",
+"kind": "TokenRequest",
+"spec": {
+"audiences": [audience],
+"expirationSeconds": expiration_seconds,
+},
+}
+
+def _get_k8s_token_request_api(self) -> str:
+"""
+Get JWT token using Kubernetes TokenRequest API.
+
+Dynamically requests a service account token from the Kubernetes API 
server
+with custom audience and expiration settings.
+
+:return: JWT Service Account token string
+"""
+audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+expiration_seconds = 
self.databricks_conn

Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-06 Thread via GitHub


Nataneljpwd commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2773584782


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+with open(projected_token_path) as f:
+token = f.read().strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+async def _a_get_k8s_projected_volume_token(self) -> str:
+"""Async version of _get_k8s_projected_volume_token()."""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+async with aiofiles.open(projected_token_path) as f:
+token = (await f.read()).strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+def _get_k8s_token_request_api(self) -> str:
+"""
+Get JWT token using Kubernetes TokenRequest API.
+
+Dynamically requests a service account token from the Kubernetes API 
server
+with custom audience and expiration settings.
+
+:return: JWT Service Account token string
+"""
+audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+expiration_seconds = 
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+token_path = self.databricks_conn.extra_dejson.get(
+"k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+)
+namespace_path = self.databricks_conn.extra_dejson.get(
+"k8s_namespace_

Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-06 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2773015612


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+with open(projected_token_path) as f:
+token = f.read().strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+async def _a_get_k8s_projected_volume_token(self) -> str:
+"""Async version of _get_k8s_projected_volume_token()."""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+async with aiofiles.open(projected_token_path) as f:
+token = (await f.read()).strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+def _get_k8s_token_request_api(self) -> str:
+"""
+Get JWT token using Kubernetes TokenRequest API.
+
+Dynamically requests a service account token from the Kubernetes API 
server
+with custom audience and expiration settings.
+
+:return: JWT Service Account token string
+"""
+audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+expiration_seconds = 
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+token_path = self.databricks_conn.extra_dejson.get(
+"k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+)
+namespace_path = self.databricks_conn.extra_dejson.get(
+"k8s_namespace_p

Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-06 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2772988667


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")

Review Comment:
   I updated the code to avoid this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-06 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2772969499


##
providers/databricks/docs/connections/databricks.rst:
##
@@ -91,6 +100,244 @@ Extra (optional)
 * ``azure_resource_id``: optional Resource ID of the Azure Databricks 
workspace (required if managed identity isn't
   a user inside workspace)
 
+The following parameters are necessary if using authentication with 
Kubernetes OIDC token federation:
+
+* ``federated_k8s``: set ``login`` to ``"federated_k8s"`` or add this as 
extra parameter. When enabled, the hook will fetch a JWT token from Kubernetes 
and exchange it for a Databricks OAuth token using the `OIDC token exchange API 
`_.
 This authentication method only works when Airflow is running inside a 
Kubernetes cluster (e.g., AWS EKS, Azure AKS, Google GKE).

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-06 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2772955650


##
providers/databricks/docs/connections/databricks.rst:
##
@@ -41,6 +41,13 @@ There are several ways to connect to Databricks using 
Airflow.
`user inside workspace 
`_,
 or `outside of workspace having Owner or Contributor permissions 
`_
 4. Using Azure Active Directory (AAD) token obtained for `Azure managed 
identity 
`_,
when Airflow runs on the VM with assigned managed identity (system-assigned 
or user-assigned)
+5. Using Databricks-managed Service Principal OAuth

Review Comment:
   corrected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-05 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2770636319


##
providers/databricks/docs/connections/databricks.rst:
##
@@ -91,6 +100,244 @@ Extra (optional)
 * ``azure_resource_id``: optional Resource ID of the Azure Databricks 
workspace (required if managed identity isn't
   a user inside workspace)
 
+The following parameters are necessary if using authentication with 
Kubernetes OIDC token federation:
+
+* ``federated_k8s``: set ``login`` to ``"federated_k8s"`` or add this as 
extra parameter. When enabled, the hook will fetch a JWT token from Kubernetes 
and exchange it for a Databricks OAuth token using the `OIDC token exchange API 
`_.
 This authentication method only works when Airflow is running inside a 
Kubernetes cluster (e.g., AWS EKS, Azure AKS, Google GKE).

Review Comment:
   yes, will add this to the doc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-05 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2770632680


##
providers/databricks/docs/connections/databricks.rst:
##
@@ -91,6 +100,244 @@ Extra (optional)
 * ``azure_resource_id``: optional Resource ID of the Azure Databricks 
workspace (required if managed identity isn't
   a user inside workspace)
 
+The following parameters are necessary if using authentication with 
Kubernetes OIDC token federation:
+
+* ``federated_k8s``: set ``login`` to ``"federated_k8s"`` or add this as 
extra parameter. When enabled, the hook will fetch a JWT token from Kubernetes 
and exchange it for a Databricks OAuth token using the `OIDC token exchange API 
`_.
 This authentication method only works when Airflow is running inside a 
Kubernetes cluster (e.g., AWS EKS, Azure AKS, Google GKE).
+
+**Two methods are supported for obtaining the Kubernetes JWT token:**
+
+**Method 1: Projected Volume**
+
+* ``k8s_projected_volume_token_path``: (optional) path to a [Kubernetes 
projected volume service account 
token](https://kubernetes.io/docs/concepts/configuration/secret/#projected-volume).
 When configured, the hook will read the token directly from this file. The 
token must be configured in your Pod spec with the appropriate audience and 
expiration. This is the recommended method as it's simpler and more efficient 
(no API calls). See the example Pod configuration below.

Review Comment:
   corrected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-05 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2770631295


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+with open(projected_token_path) as f:
+token = f.read().strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+async def _a_get_k8s_projected_volume_token(self) -> str:
+"""Async version of _get_k8s_projected_volume_token()."""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+async with aiofiles.open(projected_token_path) as f:
+token = (await f.read()).strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+def _get_k8s_token_request_api(self) -> str:
+"""
+Get JWT token using Kubernetes TokenRequest API.
+
+Dynamically requests a service account token from the Kubernetes API 
server
+with custom audience and expiration settings.
+
+:return: JWT Service Account token string
+"""
+audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+expiration_seconds = 
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+token_path = self.databricks_conn.extra_dejson.get(
+"k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+)
+namespace_path = self.databricks_conn.extra_dejson.get(
+"k8s_namespace_p

Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-05 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2770573516


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+with open(projected_token_path) as f:
+token = f.read().strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+async def _a_get_k8s_projected_volume_token(self) -> str:
+"""Async version of _get_k8s_projected_volume_token()."""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+async with aiofiles.open(projected_token_path) as f:
+token = (await f.read()).strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+def _get_k8s_token_request_api(self) -> str:
+"""
+Get JWT token using Kubernetes TokenRequest API.
+
+Dynamically requests a service account token from the Kubernetes API 
server
+with custom audience and expiration settings.
+
+:return: JWT Service Account token string
+"""
+audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+expiration_seconds = 
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+token_path = self.databricks_conn.extra_dejson.get(
+"k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+)
+namespace_path = self.databricks_conn.extra_dejson.get(
+"k8s_namespace_p

Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-05 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2770573516


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+with open(projected_token_path) as f:
+token = f.read().strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+async def _a_get_k8s_projected_volume_token(self) -> str:
+"""Async version of _get_k8s_projected_volume_token()."""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+async with aiofiles.open(projected_token_path) as f:
+token = (await f.read()).strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+def _get_k8s_token_request_api(self) -> str:
+"""
+Get JWT token using Kubernetes TokenRequest API.
+
+Dynamically requests a service account token from the Kubernetes API 
server
+with custom audience and expiration settings.
+
+:return: JWT Service Account token string
+"""
+audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+expiration_seconds = 
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+token_path = self.databricks_conn.extra_dejson.get(
+"k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+)
+namespace_path = self.databricks_conn.extra_dejson.get(
+"k8s_namespace_p

Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-05 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2770542179


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+with open(projected_token_path) as f:
+token = f.read().strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+async def _a_get_k8s_projected_volume_token(self) -> str:
+"""Async version of _get_k8s_projected_volume_token()."""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+async with aiofiles.open(projected_token_path) as f:
+token = (await f.read()).strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+def _get_k8s_token_request_api(self) -> str:
+"""
+Get JWT token using Kubernetes TokenRequest API.
+
+Dynamically requests a service account token from the Kubernetes API 
server
+with custom audience and expiration settings.
+
+:return: JWT Service Account token string
+"""
+audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+expiration_seconds = 
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+token_path = self.databricks_conn.extra_dejson.get(
+"k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+)
+namespace_path = self.databricks_conn.extra_dejson.get(
+"k8s_namespace_p

Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-05 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2770530754


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+with open(projected_token_path) as f:
+token = f.read().strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+async def _a_get_k8s_projected_volume_token(self) -> str:
+"""Async version of _get_k8s_projected_volume_token()."""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+async with aiofiles.open(projected_token_path) as f:
+token = (await f.read()).strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+def _get_k8s_token_request_api(self) -> str:
+"""
+Get JWT token using Kubernetes TokenRequest API.
+
+Dynamically requests a service account token from the Kubernetes API 
server
+with custom audience and expiration settings.
+
+:return: JWT Service Account token string
+"""
+audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+expiration_seconds = 
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+token_path = self.databricks_conn.extra_dejson.get(
+"k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+)
+namespace_path = self.databricks_conn.extra_dejson.get(
+"k8s_namespace_p

Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-05 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2770530754


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+with open(projected_token_path) as f:
+token = f.read().strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+async def _a_get_k8s_projected_volume_token(self) -> str:
+"""Async version of _get_k8s_projected_volume_token()."""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+async with aiofiles.open(projected_token_path) as f:
+token = (await f.read()).strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+def _get_k8s_token_request_api(self) -> str:
+"""
+Get JWT token using Kubernetes TokenRequest API.
+
+Dynamically requests a service account token from the Kubernetes API 
server
+with custom audience and expiration settings.
+
+:return: JWT Service Account token string
+"""
+audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+expiration_seconds = 
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+token_path = self.databricks_conn.extra_dejson.get(
+"k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+)
+namespace_path = self.databricks_conn.extra_dejson.get(
+"k8s_namespace_p

Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-05 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2770530754


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+with open(projected_token_path) as f:
+token = f.read().strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+async def _a_get_k8s_projected_volume_token(self) -> str:
+"""Async version of _get_k8s_projected_volume_token()."""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+async with aiofiles.open(projected_token_path) as f:
+token = (await f.read()).strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+def _get_k8s_token_request_api(self) -> str:
+"""
+Get JWT token using Kubernetes TokenRequest API.
+
+Dynamically requests a service account token from the Kubernetes API 
server
+with custom audience and expiration settings.
+
+:return: JWT Service Account token string
+"""
+audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+expiration_seconds = 
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+token_path = self.databricks_conn.extra_dejson.get(
+"k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+)
+namespace_path = self.databricks_conn.extra_dejson.get(
+"k8s_namespace_p

Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-05 Thread via GitHub


alexott commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2768823315


##
providers/databricks/docs/connections/databricks.rst:
##
@@ -91,6 +100,244 @@ Extra (optional)
 * ``azure_resource_id``: optional Resource ID of the Azure Databricks 
workspace (required if managed identity isn't
   a user inside workspace)
 
+The following parameters are necessary if using authentication with 
Kubernetes OIDC token federation:
+
+* ``federated_k8s``: set ``login`` to ``"federated_k8s"`` or add this as 
extra parameter. When enabled, the hook will fetch a JWT token from Kubernetes 
and exchange it for a Databricks OAuth token using the `OIDC token exchange API 
`_.
 This authentication method only works when Airflow is running inside a 
Kubernetes cluster (e.g., AWS EKS, Azure AKS, Google GKE).
+
+**Two methods are supported for obtaining the Kubernetes JWT token:**
+
+**Method 1: Projected Volume**
+
+* ``k8s_projected_volume_token_path``: (optional) path to a [Kubernetes 
projected volume service account 
token](https://kubernetes.io/docs/concepts/configuration/secret/#projected-volume).
 When configured, the hook will read the token directly from this file. The 
token must be configured in your Pod spec with the appropriate audience and 
expiration. This is the recommended method as it's simpler and more efficient 
(no API calls). See the example Pod configuration below.

Review Comment:
   This is `.rst` file, so links are done differently than in Markdown



##
providers/databricks/docs/connections/databricks.rst:
##
@@ -41,6 +41,13 @@ There are several ways to connect to Databricks using 
Airflow.
`user inside workspace 
`_,
 or `outside of workspace having Owner or Contributor permissions 
`_
 4. Using Azure Active Directory (AAD) token obtained for `Azure managed 
identity 
`_,
when Airflow runs on the VM with assigned managed identity (system-assigned 
or user-assigned)
+5. Using Databricks-managed Service Principal OAuth

Review Comment:
   I maybe would add something like `(available on all supported clouds)`



##
providers/databricks/docs/connections/databricks.rst:
##
@@ -91,6 +100,244 @@ Extra (optional)
 * ``azure_resource_id``: optional Resource ID of the Azure Databricks 
workspace (required if managed identity isn't
   a user inside workspace)
 
+The following parameters are necessary if using authentication with 
Kubernetes OIDC token federation:
+
+* ``federated_k8s``: set ``login`` to ``"federated_k8s"`` or add this as 
extra parameter. When enabled, the hook will fetch a JWT token from Kubernetes 
and exchange it for a Databricks OAuth token using the `OIDC token exchange API 
`_.
 This authentication method only works when Airflow is running inside a 
Kubernetes cluster (e.g., AWS EKS, Azure AKS, Google GKE).

Review Comment:
   Is it a boolean flag in `extra`?



##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+

Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-05 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2770480617


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")

Review Comment:
   correct, i added this to satisfy the linter, it's not picking up the facts 
you mentioned above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-05 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2770480617


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")

Review Comment:
   correct, i added there to satisfy the linter, it's not picking up the facts 
you mentioned above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-05 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2770484491


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+with open(projected_token_path) as f:
+token = f.read().strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+async def _a_get_k8s_projected_volume_token(self) -> str:
+"""Async version of _get_k8s_projected_volume_token()."""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+async with aiofiles.open(projected_token_path) as f:
+token = (await f.read()).strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+def _get_k8s_token_request_api(self) -> str:
+"""
+Get JWT token using Kubernetes TokenRequest API.
+
+Dynamically requests a service account token from the Kubernetes API 
server
+with custom audience and expiration settings.
+
+:return: JWT Service Account token string
+"""
+audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+expiration_seconds = 
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+token_path = self.databricks_conn.extra_dejson.get(
+"k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+)
+namespace_path = self.databricks_conn.extra_dejson.get(
+"k8s_namespace_p

Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-05 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2770477669


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+with open(projected_token_path) as f:
+token = f.read().strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+async def _a_get_k8s_projected_volume_token(self) -> str:
+"""Async version of _get_k8s_projected_volume_token()."""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+async with aiofiles.open(projected_token_path) as f:
+token = (await f.read()).strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+def _get_k8s_token_request_api(self) -> str:
+"""
+Get JWT token using Kubernetes TokenRequest API.
+
+Dynamically requests a service account token from the Kubernetes API 
server
+with custom audience and expiration settings.
+
+:return: JWT Service Account token string
+"""
+audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+expiration_seconds = 
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+token_path = self.databricks_conn.extra_dejson.get(
+"k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+)
+namespace_path = self.databricks_conn.extra_dejson.get(
+"k8s_namespace_p

Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-05 Thread via GitHub


mwojtyczka commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2770477669


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+with open(projected_token_path) as f:
+token = f.read().strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+async def _a_get_k8s_projected_volume_token(self) -> str:
+"""Async version of _get_k8s_projected_volume_token()."""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+async with aiofiles.open(projected_token_path) as f:
+token = (await f.read()).strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+def _get_k8s_token_request_api(self) -> str:
+"""
+Get JWT token using Kubernetes TokenRequest API.
+
+Dynamically requests a service account token from the Kubernetes API 
server
+with custom audience and expiration settings.
+
+:return: JWT Service Account token string
+"""
+audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+expiration_seconds = 
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+token_path = self.databricks_conn.extra_dejson.get(
+"k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+)
+namespace_path = self.databricks_conn.extra_dejson.get(
+"k8s_namespace_p

Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-05 Thread via GitHub


Nataneljpwd commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2769661110


##
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##
@@ -525,6 +543,318 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
 return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+def _get_k8s_jwt_token(self) -> str:
+"""
+Get JWT token from Kubernetes.
+
+Supports two methods:
+1. Projected volume: reads token directly from configured path
+2. TokenRequest API: dynamically requests token from K8s API
+
+:return: JWT Service Account token string
+"""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return self._get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return self._get_k8s_token_request_api()
+
+async def _a_get_k8s_jwt_token(self) -> str:
+"""Async version of _get_k8s_jwt_token()."""
+if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+self.log.info("Using Kubernetes projected volume token")
+return await self._a_get_k8s_projected_volume_token()
+
+self.log.info("Using Kubernetes TokenRequest API")
+return await self._a_get_k8s_token_request_api()
+
+def _get_k8s_projected_volume_token(self) -> str:
+"""
+Get JWT token from Kubernetes projected volume.
+
+Reads a pre-configured service account token from a projected volume.
+The token should be configured in the Pod spec with the desired 
audience
+and expiration settings.
+
+:return: JWT Service Account token string
+"""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+with open(projected_token_path) as f:
+token = f.read().strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+async def _a_get_k8s_projected_volume_token(self) -> str:
+"""Async version of _get_k8s_projected_volume_token()."""
+projected_token_path: str | None = 
self.databricks_conn.extra_dejson.get(
+"k8s_projected_volume_token_path"
+)
+
+if not projected_token_path:
+raise AirflowException("k8s_projected_volume_token_path is not 
configured in connection extras")
+
+try:
+async with aiofiles.open(projected_token_path) as f:
+token = (await f.read()).strip()
+
+if not token:
+raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+return token
+except FileNotFoundError as e:
+raise AirflowException(
+f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+"Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+) from e
+except PermissionError as e:
+raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+def _get_k8s_token_request_api(self) -> str:
+"""
+Get JWT token using Kubernetes TokenRequest API.
+
+Dynamically requests a service account token from the Kubernetes API 
server
+with custom audience and expiration settings.
+
+:return: JWT Service Account token string
+"""
+audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+expiration_seconds = 
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+token_path = self.databricks_conn.extra_dejson.get(
+"k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+)
+namespace_path = self.databricks_conn.extra_dejson.get(
+"k8s_namespace_

Re: [PR] Databricks OIDC token federation for Kubernetes deployment [airflow]

2026-02-04 Thread via GitHub


boring-cyborg[bot] commented on PR #61458:
URL: https://github.com/apache/airflow/pull/61458#issuecomment-3848846860

   Congratulations on your first Pull Request and welcome to the Apache Airflow 
community! If you have any issues or are unsure about any anything please check 
our Contributors' Guide 
(https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (ruff, mypy and type 
annotations). Our [prek-hooks]( 
https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst#prerequisites-for-prek-hooks)
 will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in 
`docs/` directory). Adding a new operator? Check this short 
[guide](https://github.com/apache/airflow/blob/main/airflow-core/docs/howto/custom-operator.rst)
 Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze 
environment](https://github.com/apache/airflow/blob/main/dev/breeze/doc/README.rst)
 for testing locally, it's a heavy docker but it ships with a working Airflow 
and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get 
the final approval from Committers.
   - Please follow [ASF Code of 
Conduct](https://www.apache.org/foundation/policies/conduct) for all 
communication including (but not limited to) comments on Pull Requests, Mailing 
list and Slack.
   - Be sure to read the [Airflow Coding style]( 
https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#coding-style-and-best-practices).
   - Always keep your Pull Requests rebased, otherwise your build might fail 
due to changes not related to your commits.
   Apache Airflow is a community-driven project and together we are making it 
better 🚀.
   In case of doubts contact the developers at:
   Mailing List: [email protected]
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]