kaxil commented on code in PR #63775:
URL: https://github.com/apache/airflow/pull/63775#discussion_r3019063862


##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##########
@@ -296,9 +342,7 @@ async def _a_get_sp_token(self, resource: str) -> str:
                 with attempt:
                     async with self._session.post(
                         resource,
-                        auth=aiohttp.BasicAuth(
-                            self._get_connection_attr("login"), 
self.databricks_conn.password
-                        ),
+                        
auth=aiohttp.BasicAuth(self._get_connection_attr("login", conn), conn.password 
or ""),

Review Comment:
   `conn.password or ""` silently converts a `None` password to an empty 
string. The sync counterpart (`_get_sp_token`) passes 
`self.databricks_conn.password` directly to `HTTPBasicAuth`, which would fail 
with a `TypeError` if password is `None`. The async version swallows the 
problem and sends empty credentials to Azure, producing a confusing 401.
   
   Same pattern at line 440 (`client_secret=conn.password or ""`) and in the 
`_a_do_api_call` basic auth fallback.
   
   Consider raising explicitly if `conn.password` is `None` in these paths, or 
at minimum adding a comment explaining why `or ""` is intentional (e.g. 
`aiohttp.BasicAuth` rejects `None`).



##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##########
@@ -464,6 +509,7 @@ async def _a_get_aad_token_for_default_az_credential(self, 
resource: str) -> str
         :param resource: resource to issue token to
         :return: AAD token, or raise an exception
         """
+        await self.a_databricks_conn()

Review Comment:
   This `await self.a_databricks_conn()` call is discarded (no `conn = ...`). 
This method (`_a_get_aad_token_for_default_az_credential`) never accesses the 
connection object -- it only uses `self.oauth_tokens` and 
`DefaultAzureCredential`. The call adds an unnecessary network round-trip to 
fetch the connection on first entry into this code path. If this is meant as a 
"warm the cache" call, `__aenter__` already does that.



##########
providers/databricks/src/airflow/providers/databricks/utils/openlineage.py:
##########
@@ -24,7 +24,7 @@
 import requests
 
 from airflow.providers.common.compat.openlineage.check import 
require_openlineage_version
-from airflow.utils import timezone
+from airflow.utils import timezone  # type: ignore[attr-defined]

Review Comment:
   This `# type: ignore[attr-defined]` is unrelated to the async-to-sync fix. 
Suppressing mypy errors on imports can mask real breakage if this module moves. 
Should probably be in a separate commit/PR.



##########
providers/databricks/src/airflow/providers/databricks/triggers/databricks.py:
##########
@@ -168,6 +168,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
         )
 
     async def run(self):
+        statement_state = SQLStatementState(state="PENDING")

Review Comment:
   Pre-initializing `statement_state = SQLStatementState(state="PENDING")` 
prevents the `UnboundLocalError` when the timeout is already expired before the 
first poll -- good fix.
   
   But the emitted `TriggerEvent` will contain a fabricated "PENDING" state 
that never came from the Databricks API. Consider handling the 
immediate-timeout case before the loop instead:
   
   ```python
   if self.end_time <= time.time():
       await self.hook.a_cancel_sql_statement(self.statement_id)
       yield TriggerEvent({...})
       return
   ```



##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##########
@@ -1065,11 +1111,23 @@ def _get_oidc_token_service_url(self) -> str:
         """
         return OIDC_TOKEN_SERVICE_URL.format(f"https://{self.host}";)
 
+    async def _a_get_oidc_token_service_url(self) -> str:
+        """Async version of `_get_oidc_token_service_url`."""
+        host = await self.a_host()
+        return OIDC_TOKEN_SERVICE_URL.format(f"https://{host}";)
+
     def _endpoint_url(self, endpoint):
         port = f":{self.databricks_conn.port}" if self.databricks_conn.port 
else ""
         schema = self.databricks_conn.schema or "https"
         return f"{schema}://{self.host}{port}/{endpoint}"
 
+    async def _a_endpoint_url(self, endpoint):

Review Comment:
   `_a_endpoint_url` is missing a return type annotation. The sync 
`_endpoint_url` also lacks one, but new async code should set the bar higher. 
`-> str` would match.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to