BasPH commented on code in PR #41639:
URL: https://github.com/apache/airflow/pull/41639#discussion_r1752369891


##########
airflow/providers/databricks/hooks/databricks_base.py:
##########
@@ -374,6 +378,95 @@ async def _a_get_aad_token(self, resource: str) -> str:
 
         return jsn["access_token"]
 
+    def _get_aad_token_for_default_az_credential(self, resource: str) -> str:
+        """
+        Get AAD token for given resource for workload identity.
+
+        Supports managed identity or service principal auth.
+        :param resource: resource to issue token to
+        :return: AAD token, or raise an exception
+        """
+        aad_token = self.oauth_tokens.get(resource)
+        if aad_token and self._is_oauth_token_valid(aad_token):
+            return aad_token["access_token"]
+
+        self.log.info("Existing AAD token is expired, or going to expire soon. 
Refreshing...")
+        try:
+            from azure.identity import DefaultAzureCredential
+
+            for attempt in self._get_retry_object():
+                with attempt:
+                    # This only works in an AKS Cluster given the following 
environment variables:
+                    # AZURE_TENANT_ID, AZURE_CLIENT_ID, 
AZURE_FEDERATED_TOKEN_FILE
+                    #
+                    # While there is a WorkloadIdentityCredential class, the 
below class is advised by microsoft examples
+                    # 
https://learn.microsoft.com/nl-nl/azure/aks/workload-identity-overview

Review Comment:
   Would link to the English docs:
   
   ```suggestion
                       # While there is a WorkloadIdentityCredential class, the 
below class is advised by Microsoft
                       # 
https://learn.microsoft.com/en-us/azure/aks/workload-identity-overview
   ```



##########
airflow/providers/databricks/hooks/databricks_base.py:
##########
@@ -374,6 +378,95 @@ async def _a_get_aad_token(self, resource: str) -> str:
 
         return jsn["access_token"]
 
+    def _get_aad_token_for_default_az_credential(self, resource: str) -> str:
+        """
+        Get AAD token for given resource for workload identity.
+
+        Supports managed identity or service principal auth.
+        :param resource: resource to issue token to
+        :return: AAD token, or raise an exception
+        """
+        aad_token = self.oauth_tokens.get(resource)
+        if aad_token and self._is_oauth_token_valid(aad_token):
+            return aad_token["access_token"]
+
+        self.log.info("Existing AAD token is expired, or going to expire soon. 
Refreshing...")
+        try:
+            from azure.identity import DefaultAzureCredential
+
+            for attempt in self._get_retry_object():
+                with attempt:
+                    # This only works in an AKS Cluster given the following 
environment variables:
+                    # AZURE_TENANT_ID, AZURE_CLIENT_ID, 
AZURE_FEDERATED_TOKEN_FILE
+                    #
+                    # While there is a WorkloadIdentityCredential class, the 
below class is advised by microsoft examples
+                    # 
https://learn.microsoft.com/nl-nl/azure/aks/workload-identity-overview
+                    token = 
DefaultAzureCredential().get_token(f"{resource}/.default")
+
+                    jsn = {
+                        "access_token": token.token,
+                        "token_type": "Bearer",
+                        "expires_on": token.expires_on,
+                    }
+                    self._is_oauth_token_valid(jsn)
+                    self.oauth_tokens[resource] = jsn
+                    break
+        except ImportError as e:
+            raise AirflowOptionalProviderFeatureException(e)
+        except RetryError:
+            raise AirflowException(f"API requests to Azure failed 
{self.retry_limit} times. Giving up.")
+        except requests_exceptions.HTTPError as e:
+            msg = f"Response: {e.response.content.decode()}, Status Code: 
{e.response.status_code}"
+            raise AirflowException(msg)
+
+        return token.token
+
+    async def _a_get_aad_token_for_default_az_credential(self, resource: str) 
-> str:
+        """
+        Get AAD token for given resource for workload identity.
+
+        Supports managed identity or service principal auth.
+        :param resource: resource to issue token to
+        :return: AAD token, or raise an exception
+        """
+        aad_token = self.oauth_tokens.get(resource)
+        if aad_token and self._is_oauth_token_valid(aad_token):
+            return aad_token["access_token"]
+
+        self.log.info("Existing AAD token is expired, or going to expire soon. 
Refreshing...")
+        try:
+            # from azure.identity import DefaultAzureCredential

Review Comment:
   Remove unnecessary comment
   
   ```suggestion
   ```



##########
airflow/providers/databricks/hooks/databricks_base.py:
##########
@@ -374,6 +378,95 @@ async def _a_get_aad_token(self, resource: str) -> str:
 
         return jsn["access_token"]
 
+    def _get_aad_token_for_default_az_credential(self, resource: str) -> str:
+        """
+        Get AAD token for given resource for workload identity.
+
+        Supports managed identity or service principal auth.
+        :param resource: resource to issue token to
+        :return: AAD token, or raise an exception
+        """
+        aad_token = self.oauth_tokens.get(resource)
+        if aad_token and self._is_oauth_token_valid(aad_token):
+            return aad_token["access_token"]
+
+        self.log.info("Existing AAD token is expired, or going to expire soon. 
Refreshing...")
+        try:
+            from azure.identity import DefaultAzureCredential
+
+            for attempt in self._get_retry_object():
+                with attempt:
+                    # This only works in an AKS Cluster given the following 
environment variables:
+                    # AZURE_TENANT_ID, AZURE_CLIENT_ID, 
AZURE_FEDERATED_TOKEN_FILE
+                    #
+                    # While there is a WorkloadIdentityCredential class, the 
below class is advised by microsoft examples
+                    # 
https://learn.microsoft.com/nl-nl/azure/aks/workload-identity-overview
+                    token = 
DefaultAzureCredential().get_token(f"{resource}/.default")
+
+                    jsn = {
+                        "access_token": token.token,
+                        "token_type": "Bearer",
+                        "expires_on": token.expires_on,
+                    }
+                    self._is_oauth_token_valid(jsn)
+                    self.oauth_tokens[resource] = jsn
+                    break
+        except ImportError as e:
+            raise AirflowOptionalProviderFeatureException(e)
+        except RetryError:
+            raise AirflowException(f"API requests to Azure failed 
{self.retry_limit} times. Giving up.")
+        except requests_exceptions.HTTPError as e:
+            msg = f"Response: {e.response.content.decode()}, Status Code: 
{e.response.status_code}"
+            raise AirflowException(msg)
+
+        return token.token
+
+    async def _a_get_aad_token_for_default_az_credential(self, resource: str) 
-> str:
+        """
+        Get AAD token for given resource for workload identity.
+
+        Supports managed identity or service principal auth.
+        :param resource: resource to issue token to
+        :return: AAD token, or raise an exception
+        """
+        aad_token = self.oauth_tokens.get(resource)
+        if aad_token and self._is_oauth_token_valid(aad_token):
+            return aad_token["access_token"]
+
+        self.log.info("Existing AAD token is expired, or going to expire soon. 
Refreshing...")
+        try:
+            # from azure.identity import DefaultAzureCredential
+            from azure.identity.aio import (
+                DefaultAzureCredential as AsyncDefaultAzureCredential,
+            )
+
+            for attempt in self._get_retry_object():
+                with attempt:
+                    # This only works in an AKS Cluster given the following 
environment variables:
+                    # AZURE_TENANT_ID, AZURE_CLIENT_ID, 
AZURE_FEDERATED_TOKEN_FILE
+                    #
+                    # While there is a WorkloadIdentityCredential class, the 
below class is advised by microsoft examples
+                    # 
https://learn.microsoft.com/nl-nl/azure/aks/workload-identity-overview

Review Comment:
   ```suggestion
                       # While there is a WorkloadIdentityCredential class, the 
below class is advised by Microsoft
                       # 
https://learn.microsoft.com/en-us/azure/aks/workload-identity-overview
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to