leestro opened a new issue, #68240:
URL: https://github.com/apache/airflow/issues/68240
## Summary
Three bugs found in `StackdriverRemoteLogIO` (introduced in #65198) when
running against
Airflow 3.2.2 with `apache-airflow-providers-google==22.1.0rc1` on
CeleryExecutor. All
three stem from the same root assumption: the implementation was written as
if it runs in
the **task subprocess**, but in AF3's supervisor model `REMOTE_TASK_LOG`
runs in the
**supervisor process** instead.
Reproduced on APC 1.1.2 / GKE / CeleryExecutor / Airflow 3.2.2.
---
## Bug 1 — `proc()` ships empty labels to Cloud Logging
**Location:** `StackdriverRemoteLogIO.processors` → the `proc` closure.
**What happens:** Every log entry arrives in Cloud Logging with an empty
`labels` dict.
The `proc` function reads `record.task_instance` to populate `dag_id`,
`task_id`, etc.
but `record.task_instance` is never set in supervisor context — it is a
task-subprocess
concept. The attribute is `None` (or absent) on every record the supervisor
emits.
**Fix:** Parse labels from the structured log path that
`relative_path_from_logger()`
returns in supervisor context. AF3's log path template is:
```
dag_id=<x>/run_id=<x>/task_id=<x>/attempt=<N>.log
```
All four label fields (`dag_id`, `run_id`, `task_id`, `try_number`) can be
extracted from
this path with zero DB access and zero dependency on `record.task_instance`.
---
## Bug 2 — `read()` filters on `logical_date`, which supervisors cannot
derive
**Location:** `StackdriverRemoteLogIO.read()` → call to `prepare_log_filter`.
**What happens:** `read()` constructs the Cloud Logging filter using
`logical_date`
(derived from `ti.execution_date`). But the supervisor has no DB connection
to convert
`run_id` → `logical_date`. The filter either crashes or returns no results.
**Fix:** Filter on `run_id` instead. Both the write path (log path template,
see Bug 1)
and the read path (`ti.run_id`) expose `run_id` without any DB lookup. The
filter becomes:
```python
{
"dag_id": ti.dag_id,
"task_id": ti.task_id,
"run_id": ti.run_id,
"try_number": str(ti.try_number),
}
```
This is consistent with how `run_id` is used on the write side via
path-based labeling.
---
## Bug 3 — `transport.send()` is unguarded, crashes supervised components on
IAM/gRPC errors
**Location:** `StackdriverRemoteLogIO.processors` → `proc` closure →
`transport.send()`.
**What happens:** Any IAM permission error, gRPC connectivity failure, or
quota exception
from `transport.send()` propagates uncaught. Because `REMOTE_TASK_LOG`
applies to **all**
supervised components (scheduler, dag-processor, triggerer, workers), a
single IAM
misconfiguration crashes the entire process. Observed: dag-processor pod
enters
`CrashLoopBackOff` on every log emit when the Kubernetes Service Account
lacks the
`logging.logEntries.create` IAM binding. This is a startup-time
misconfiguration that is
easy to hit and should degrade gracefully.
**Fix:** Wrap `transport.send()` in a `try/except Exception` and emit a
`logging.warning`
instead of propagating. Log delivery is best-effort; a GCL error should
never kill a
task-executing process.
---
## Workaround
A subclass of `StackdriverRemoteLogIO` patching all three issues:
```python
class _GCLRemoteLogIO(StackdriverRemoteLogIO):
@cached_property
def processors(self):
transport = self.transport
resource = self.resource
log_record_factory = getLogRecordFactory()
def proc(logger, method_name, event):
rel = relative_path_from_logger(logger) if logger else None
if not rel:
return event
labels = _labels_from_path(rel) # parse
dag_id/run_id/task_id/try_number from path
...
try:
transport.send(record, str(msg.get("event", "")),
resource=resource, labels=labels)
except Exception as exc:
_log.warning("GCL send failed (%s): %s", type(exc).__name__,
exc)
return event
return (proc,)
def read(self, relative_path, ti):
log_filter = self.prepare_log_filter({
"dag_id": ti.dag_id,
"task_id": ti.task_id,
"run_id": ti.run_id,
"try_number": str(ti.try_number),
})
messages, _, _ = self.read_logs(log_filter, next_page_token=None,
all_pages=True)
return (
[f"Reading remote log from Stackdriver for {relative_path}"],
[messages] if messages else [],
)
```
End-to-end validated on Airflow 3.2.2 / CeleryExecutor / GKE. GCL entries
carry all four
labels; `read()` returns correct log lines; no supervisor crashes on IAM
errors.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]