leestro opened a new issue, #68240:
URL: https://github.com/apache/airflow/issues/68240

   ## Summary
   
   Three bugs found in `StackdriverRemoteLogIO` (introduced in #65198) when 
running against
   Airflow 3.2.2 with `apache-airflow-providers-google==22.1.0rc1` on 
CeleryExecutor. All
   three stem from the same root assumption: the implementation was written as 
if it runs in
   the **task subprocess**, but in AF3's supervisor model `REMOTE_TASK_LOG` 
runs in the
   **supervisor process** instead.
   
   Reproduced on APC 1.1.2 / GKE / CeleryExecutor / Airflow 3.2.2.
   
   ---
   
   ## Bug 1 — `proc()` ships empty labels to Cloud Logging
   
   **Location:** `StackdriverRemoteLogIO.processors` → the `proc` closure.
   
   **What happens:** Every log entry arrives in Cloud Logging with an empty 
`labels` dict.
   The `proc` function reads `record.task_instance` to populate `dag_id`, 
`task_id`, etc.
   but `record.task_instance` is never set in supervisor context — it is a 
task-subprocess
   concept. The attribute is `None` (or absent) on every record the supervisor 
emits.
   
   **Fix:** Parse labels from the structured log path that 
`relative_path_from_logger()`
   returns in supervisor context. AF3's log path template is:
   
   ```
   dag_id=<x>/run_id=<x>/task_id=<x>/attempt=<N>.log
   ```
   
   All four label fields (`dag_id`, `run_id`, `task_id`, `try_number`) can be 
extracted from
   this path with zero DB access and zero dependency on `record.task_instance`.
   
   ---
   
   ## Bug 2 — `read()` filters on `logical_date`, which supervisors cannot 
derive
   
   **Location:** `StackdriverRemoteLogIO.read()` → call to `prepare_log_filter`.
   
   **What happens:** `read()` constructs the Cloud Logging filter using 
`logical_date`
   (derived from `ti.execution_date`). But the supervisor has no DB connection 
to convert
   `run_id` → `logical_date`. The filter either crashes or returns no results.
   
   **Fix:** Filter on `run_id` instead. Both the write path (log path template, 
see Bug 1)
   and the read path (`ti.run_id`) expose `run_id` without any DB lookup. The 
filter becomes:
   
   ```python
   {
       "dag_id": ti.dag_id,
       "task_id": ti.task_id,
       "run_id": ti.run_id,
       "try_number": str(ti.try_number),
   }
   ```
   
   This is consistent with how `run_id` is used on the write side via 
path-based labeling.
   
   ---
   
   ## Bug 3 — `transport.send()` is unguarded, crashes supervised components on 
IAM/gRPC errors
   
   **Location:** `StackdriverRemoteLogIO.processors` → `proc` closure → 
`transport.send()`.
   
   **What happens:** Any IAM permission error, gRPC connectivity failure, or 
quota exception
   from `transport.send()` propagates uncaught. Because `REMOTE_TASK_LOG` 
applies to **all**
   supervised components (scheduler, dag-processor, triggerer, workers), a 
single IAM
   misconfiguration crashes the entire process. Observed: dag-processor pod 
enters
   `CrashLoopBackOff` on every log emit when the Kubernetes Service Account 
lacks the
   `logging.logEntries.create` IAM binding. This is a startup-time 
misconfiguration that is
   easy to hit and should degrade gracefully.
   
   **Fix:** Wrap `transport.send()` in a `try/except Exception` and emit a 
`logging.warning`
   instead of propagating. Log delivery is best-effort; a GCL error should 
never kill a
   task-executing process.
   
   ---
   
   ## Workaround
   
   A subclass of `StackdriverRemoteLogIO` patching all three issues:
   
   ```python
   class _GCLRemoteLogIO(StackdriverRemoteLogIO):
       @cached_property
       def processors(self):
           transport = self.transport
           resource = self.resource
           log_record_factory = getLogRecordFactory()
   
           def proc(logger, method_name, event):
               rel = relative_path_from_logger(logger) if logger else None
               if not rel:
                   return event
               labels = _labels_from_path(rel)  # parse 
dag_id/run_id/task_id/try_number from path
               ...
               try:
                   transport.send(record, str(msg.get("event", "")), 
resource=resource, labels=labels)
               except Exception as exc:
                   _log.warning("GCL send failed (%s): %s", type(exc).__name__, 
exc)
               return event
   
           return (proc,)
   
       def read(self, relative_path, ti):
           log_filter = self.prepare_log_filter({
               "dag_id": ti.dag_id,
               "task_id": ti.task_id,
               "run_id": ti.run_id,
               "try_number": str(ti.try_number),
           })
           messages, _, _ = self.read_logs(log_filter, next_page_token=None, 
all_pages=True)
           return (
               [f"Reading remote log from Stackdriver for {relative_path}"],
               [messages] if messages else [],
           )
   ```
   
   End-to-end validated on Airflow 3.2.2 / CeleryExecutor / GKE. GCL entries 
carry all four
   labels; `read()` returns correct log lines; no supervisor crashes on IAM 
errors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to