moomindani commented on code in PR #66895:
URL: https://github.com/apache/airflow/pull/66895#discussion_r3263752278


##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_sql.py:
##########
@@ -172,17 +202,27 @@ def get_conn(self) -> AirflowConnection:
         if not self._sql_conn or prev_token != new_token:
             if self._sql_conn:  # close already existing connection
                 self._sql_conn.close()
+            session_config: dict[str, str] = dict(self.session_config) if 
self.session_config else {}
+            if self.query_tags:
+                tags_str = _format_query_tags(self.query_tags)
+                existing = session_config.get("QUERY_TAGS", "")
+                session_config["QUERY_TAGS"] = f"{existing},{tags_str}" if 
existing else tags_str
+
+            connect_kwargs = {

Review Comment:
   Minor fragility: `get_conn()` only rebuilds `_sql_conn` when `not 
self._sql_conn or prev_token != new_token`. If anything triggers `get_conn()` 
on this hook *before* `execute()` sets `hook.query_tags`, the cached connection 
(without the new `QUERY_TAGS` session config) is returned and the new tags 
never take effect.
   
   For the current operator flow (cached_property `_hook` per operator 
instance, single `execute()` per task) this is safe. But it's quietly 
load-bearing on "nothing else calls `get_conn()` first" — a one-line comment 
near the `prev_token != new_token` check noting that session-config changes 
require a reconnect would protect against a future refactor regressing it. 
Alternatively, take `query_tags` at hook construction time so it can never 
change after `_sql_conn` is created.



##########
providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py:
##########
@@ -46,6 +46,22 @@
 _DISALLOWED_SQL_TOKENS = (";", "--", "/*", "*/")
 
 
+def _get_airflow_query_tags(context: Context) -> dict[str, str | None]:
+    """Return Airflow context metadata as a query-tags dict."""
+    task_instance = context["ti"]

Review Comment:
   `context["ti"]` will `KeyError` if a caller passes a context dict that 
doesn't contain `"ti"`. In normal Airflow runtime that's always there, but the 
operator-side `_get_query_tags` already takes care to guard `context is not 
None`, and the test suite passes a `mock_context = {"ti": object()}` to 
exercise this path — it'd be consistent to also handle the no-`ti` case 
gracefully (`context.get("ti")` and bail out if `None`). Then this helper 
becomes safe to call from anywhere.



##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_sql.py:
##########
@@ -71,6 +71,30 @@ def _cancel():
     return timer, timeout_event
 
 
+def _format_query_tag_value(value: str) -> str:
+    """
+    Escape special characters and truncate a single query tag value.
+
+    Databricks ``QUERY_TAGS`` uses ``key:value`` pairs delimited by commas, so
+    backslash, comma and colon inside *values* must be escaped.  Values are 
also
+    capped at 128 characters before escaping to keep the overall tag string
+    within reasonable bounds.
+    """
+    value = str(value)[:128]

Review Comment:
   Two small issues with the truncation here:
   
   1. **Silent.** Long Airflow values (some `run_id` formats are ~60 chars; 
custom run_ids / user-supplied tags can be longer) get cut off without any log 
message, which is hard to debug from the analytics side. A 
`self.log.warning(...)` on the call site, or a `log.warning(...)` from a module 
logger here when truncation actually fires, would be very cheap to add.
   2. **Order vs. final byte length.** Truncation runs *before* escaping, so 
the escaped value can exceed 128 chars. If the goal is "cap the source string 
at 128 chars" the current order is fine; if the goal is "keep the final escaped 
value ≤ 128 chars on the wire" the order should flip. Worth a brief comment 
stating which contract this function is keeping.



##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_sql.py:
##########
@@ -103,6 +131,7 @@ def __init__(
         http_headers: list[tuple[str, str]] | None = None,
         catalog: str | None = None,
         schema: str | None = None,
+        query_tags: dict[str, str | None] | None = None,

Review Comment:
   Inserting `query_tags` *before* `caller` is a backwards-incompatible change 
to the public hook constructor signature — anything passing `caller` 
positionally now silently reinterprets that string as `query_tags`. The PR 
already had to fix this for the in-repo sensors (the `caller=self.caller` 
keyword edits below), but third-party code calling `DatabricksSqlHook(..., 
"MyHook")` positionally will break with a confusing type/behaviour error rather 
than a clean failure.
   
   Moving `query_tags` to *after* `caller` (or making it kwarg-only) removes 
the hazard entirely, and the sensor `caller=self.caller` change becomes 
optional rather than required. Worth doing either way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to