goingforstudying-ctrl commented on code in PR #68246:
URL: https://github.com/apache/airflow/pull/68246#discussion_r3427799502
##########
providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py:
##########
@@ -182,7 +183,10 @@ def proc(
labels[LABEL_TRY_NUMBER] = str(try_number)
if map_index := event.get("map_index"):
labels["map_index"] = str(map_index)
-
+ # In AF3 supervisor context record.task_instance is not set.
+ # Parse labels from the structured log path as additional
fallback.
+ path_labels = _labels_from_path(str(relative))
+ labels.update(path_labels)
Review Comment:
Fixed. Path labels now only fill in missing keys rather than calling
update() on the whole dict.
##########
providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py:
##########
@@ -278,6 +282,39 @@ def _read_single_logs_page(self, log_filter: str,
page_token: str | None = None)
return "\n".join(messages), page.next_page_token
+def _labels_from_path(relative_path: str) -> dict[str, str]:
Review Comment:
Renamed to _extract_labels_from_path as requested.
##########
providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py:
##########
@@ -278,6 +282,39 @@ def _read_single_logs_page(self, log_filter: str,
page_token: str | None = None)
return "\n".join(messages), page.next_page_token
+def _labels_from_path(relative_path: str) -> dict[str, str]:
+ """Parse AF3 log path into Stackdriver labels.
+
+ AF3's log path template is::
+
+ dag_id=<x>/run_id=<x>/task_id=<x>/attempt=<N>.log
+
+ All four label fields are extracted with zero DB access. When the path
+ does not match the expected format the function returns an empty dict
+ so callers can fall back to other label sources.
+ """
+ # Strip the trailing file extension (.log) and split into segments
+ stem = relative_path.rsplit(".", 1)[0] if "." in relative_path else
relative_path
+ segments = stem.split("/")
+ labels: dict[str, str] = {}
+ for segment in segments:
+ if "=" not in segment:
+ continue
+ key, _, value = segment.partition("=")
+ if key == "dag_id":
+ labels[LABEL_DAG_ID] = value
+ elif key == "task_id":
+ labels[LABEL_TASK_ID] = value
+ elif key == "attempt":
+ labels[LABEL_TRY_NUMBER] = value
+ elif key == "run_id":
+ # run_id is NOT a standard Stackdriver label yet, but it is used
+ # on the write side via the log path template. Store it so the
+ # read path can filter on it (Bug 2 will wire this up).
+ pass
Review Comment:
Removed the no-op pass branch. run_id is now stored in the labels dict like
the other keys.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]