evgeniy-b opened a new issue, #68279:
URL: https://github.com/apache/airflow/issues/68279
### Under which category would you file this issue?
Providers
### Apache Airflow version
3.1.7
### What happened and how to reproduce it?
### Issue description
A `BeamRunPythonPipelineOperator` (the Java/Go variants share the code path)
with `deferrable=True` and `runner="DataflowRunner"` fails with:
```
airflow.exceptions.AirflowException: 400 Request must contain a job and
project id.
```
whenever the Beam launcher's stdout does **not** contain a line matching
`JOB_ID_PATTERN` — even though the Dataflow job itself launches and runs to
completion. The **identical** task with `deferrable=False` succeeds against the
same job. So enabling deferrable mode turns a passing task into a failing one,
with no other change.
### Root cause
The operator only learns its `dataflow_job_id` by scanning the launcher
subprocess's stdout with this regex
(`providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py`):
```python
JOB_ID_PATTERN = re.compile(
r"Submitted job: (?P<job_id_java>[^\"\n\s]*)|Created job with id:
\[(?P<job_id_python>[^\"\n\s]*)\]"
)
```
If that line never appears, `on_new_job_id_callback` never fires and
`self.dataflow_job_id` stays `None`. There is a second consequence that
matters: the launcher's stdout-reading loop (`run_beam_command` / `process_fd`
in `providers/apache/beam/.../hooks/beam.py`) only short-circuits when
`is_dataflow_job_id_exist_callback()` returns truthy. With no job id ever
captured that callback stays `False`, so the loop **never returns early** — it
blocks reading stdout until the subprocess exits, i.e. until the Dataflow job
itself finishes. In the normal case the scan returns as soon as the id line is
seen and the remaining wait is delegated to the Dataflow API; here the entire
job runs inline before either wait mode is reached.
**Why the line is commonly missing.** The Beam SDK logs `Created job with
id: [...]` from `apache_beam.runners.dataflow.internal.apiclient` at **INFO**
level. Python's root logger defaults to **WARNING**, so a pipeline only emits
that line if its own script opts into INFO logging — which the canonical Beam
`wordcount` example does explicitly in its `__main__`:
```python
logging.getLogger().setLevel(logging.INFO)
```
Any `DataflowRunner` pipeline whose entrypoint does **not** raise the root
level to INFO (a common omission) suppresses the job-id line by default, with
no code change on the Airflow side and no unusual setup. Note that the INFO
lines that *do* show up in the Airflow task log (`Beam version: ...`, `Running
command: ...`, `Start waiting ...`) come from Airflow's own
`self.log.info(...)` in the worker process — not from the launcher subprocess —
which is why they appear while the subprocess's job-id line does not. A
newer/different Beam SDK that rewords or relevels the line is a second,
independent way to miss the regex.
By the time the scan returns, the job has already finished. The two wait
modes then diverge only in how they handle the (now-terminal) job with a `None`
id:
- **`deferrable=False`** — the operator calls
`DataflowHook.wait_for_done(job_id=None, job_name=...)`. With no id,
`_DataflowJobsController._get_current_jobs` resolves the job by **name prefix**
(`_fetch_jobs_by_prefix_name`); the job is already `JOB_STATE_DONE`, so
`wait_for_done` returns after a single lookup (no real polling — there is no
`Waiting for done. Sleep` iteration). The task **succeeds**.
- **`deferrable=True`** — the operator defers with
`job_id=self.dataflow_job_id` (i.e. `None`). The
`DataflowJobStateCompleteTrigger` (or `DataflowJobStatusTrigger` on older
google providers) then calls `get_job_status(job_id=None)`; the Dataflow API
rejects it with `400 Request must contain a job and project id.`; the trigger
yields an error `TriggerEvent`; and `execute_complete` re-raises it as
`AirflowException`. The task **fails**.
The deferrable path has no by-name fallback for a missing job id; the sync
path does. (Note the deferrable path is also strictly worse here: the job has
already completed before the task defers, so deferring buys nothing and then
fails on resume.)
Relevant code (paths on `main`):
- `JOB_ID_PATTERN` + `process_line_and_extract_dataflow_job_id_callback` —
`providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py`
- launcher loop that only records the id on a regex match (`process_fd` /
`run_beam_command`) —
`providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py`
- `defer(... job_id=self.dataflow_job_id ...)` in `execute_on_dataflow` —
`providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py`
- `execute_complete` re-raising the error event —
`providers/apache/beam/.../operators/beam.py` (`raise
AirflowException(event["message"])`)
- sync fallback to name lookup — `DataflowHook.wait_for_done`
- trigger calling the API with `job_id=None` —
`providers/google/src/airflow/providers/google/cloud/triggers/dataflow.py`
### Steps to reproduce
Minimal, deterministic reproduction with a **real** Dataflow job (no mocking
of Airflow internals). Verified on Airflow 3.1.7 / apache-beam 2.71.0 /
apache-airflow-providers-google 20.0.0 / apache-airflow-providers-apache-beam
6.2.3.
1. A no-op Beam pipeline that runs on Dataflow and does **no logging
configuration** — it leaves the root logger at its default WARNING, so the
`Created job with id: [...]` INFO line never reaches stdout. The job still
submits and runs. (This mirrors any pipeline that doesn't opt into INFO
logging.)
<details><summary>repro_missing_jobid.py (staged in GCS)</summary>
```python
# No logging config on purpose: the SDK's "Created job with id: [...]"
line
# is INFO, the root logger defaults to WARNING, so the line is filtered
out.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
with beam.Pipeline(options=PipelineOptions()) as p:
p | "Create" >> beam.Create([1, 2, 3]) | "Noop" >> beam.Map(lambda x:
x)
```
</details>
2. A DAG with two tasks running that same pipeline on `DataflowRunner`,
differing only in `deferrable`:
```python
from datetime import datetime
from airflow.models import DAG
from airflow.providers.apache.beam.operators.beam import
BeamRunPythonPipelineOperator
from airflow.providers.google.cloud.operators.dataflow import
DataflowConfiguration
PROJECT = "<your-project>"
REGION = "<your-region>" # e.g. europe-west1
def beam_task(task_id, deferrable):
return BeamRunPythonPipelineOperator(
task_id=task_id,
runner="DataflowRunner",
py_file="gs://<bucket>/repro_missing_jobid.py",
pipeline_options={
"project": PROJECT,
"region": REGION,
"temp_location": "gs://<bucket>/tmp/",
},
dataflow_config=DataflowConfiguration(project_id=PROJECT,
location=REGION),
deferrable=deferrable,
)
with DAG("repro_beam_missing_jobid", start_date=datetime(2026, 1, 1),
schedule=None, catchup=False):
beam_task("run_sync", deferrable=False)
beam_task("run_deferrable", deferrable=True)
```
3. Trigger the DAG (a triggerer must be running for the deferrable task to
resume).
**Result:**
- `run_sync` → **success**.
- `run_deferrable` → **failed** with `400 Request must contain a job and
project id.`
### What you think should happen instead?
At minimum, a deferrable Beam/Dataflow task should not *fail* when the
launcher stdout lacks the job-id line: the synchronous path already handles
that exact condition (resolving the finished job by name) and the Dataflow job
itself runs fine. Flipping `deferrable` from `False` to `True` should never
turn a passing task into a failing one.
But the deeper issue is that the whole job-id detection mechanism is
fragile, and it makes a *true* async path impossible whenever the id is absent
from stdout:
- **Job-id discovery depends on scraping a specific SDK log line.**
`JOB_ID_PATTERN` matches the exact strings `Created job with id: [...]`
(Python) / `Submitted job: ...` (Java). That line is emitted at INFO, so it
only appears when the pipeline opts the root logger into INFO; it also breaks
on any SDK rewording/relevelling or log reformatting. There is no API-based
fallback to discover the id of the job that was just launched — so the id is
silently lost through a perfectly ordinary configuration.
- **Without the id, there is no real async execution at all.** The
launcher's stdout loop only short-circuits once the id is seen; with no id it
blocks until the subprocess exits — i.e. it runs the *entire* Dataflow job
synchronously on the worker. By the time the operator reaches the `deferrable`
branch the job has already finished, so deferring buys nothing (no worker slot
is freed during the job) and then fails on resume because it defers with
`job_id=None`. So the deferrable feature is not merely buggy in this case — it
is structurally unable to defer until *after* the job id is captured, which is
exactly what's missing.
A robust fix would decouple job-id resolution from stdout scraping (e.g.
resolve the just-launched job's id via the Dataflow API by name before
deferring, as the sync path effectively does), so that the deferrable path can
defer immediately and the operator no longer depends on a fragile log-line
match.
### Operating System
_No response_
### Deployment
Google Managed Service for Apache Airflow
### Apache Airflow Provider(s)
google
### Versions of Apache Airflow Providers
apache-airflow-providers-apache-beam==6.2.3
apache-airflow-providers-google==20.0.0
### Official Helm Chart version
Not Applicable
### Kubernetes Version
_No response_
### Helm Chart configuration
_No response_
### Docker Image customizations
_No response_
### Anything else?
- Occurs **every time** the launcher stdout omits the job-id line; never
when it contains it. Because emitting the line requires the pipeline to opt
into INFO logging (the root logger defaults to WARNING), any pipeline that
doesn't do so fails deterministically in deferrable mode — it is not flaky.
With the default `append_job_name=True` the job name is unique, so a name-based
resolution would yield a single job.
- Background and proposed fixes discussed on [PR
#67711](https://github.com/apache/airflow/pull/67711).
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]