evgeniy-b opened a new issue, #68279:
URL: https://github.com/apache/airflow/issues/68279

   ### Under which category would you file this issue?
   
   Providers
   
   ### Apache Airflow version
   
   3.1.7
   
   ### What happened and how to reproduce it?
   
   ### Issue description
   
   A `BeamRunPythonPipelineOperator` (the Java/Go variants share the code path) 
with `deferrable=True` and `runner="DataflowRunner"` fails with:
   
   ```
   airflow.exceptions.AirflowException: 400 Request must contain a job and 
project id.
   ```
   
   whenever the Beam launcher's stdout does **not** contain a line matching 
`JOB_ID_PATTERN` — even though the Dataflow job itself launches and runs to 
completion. The **identical** task with `deferrable=False` succeeds against the 
same job. So enabling deferrable mode turns a passing task into a failing one, 
with no other change.
   
   ### Root cause
   
   The operator only learns its `dataflow_job_id` by scanning the launcher 
subprocess's stdout with this regex 
(`providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py`):
   
   ```python
   JOB_ID_PATTERN = re.compile(
       r"Submitted job: (?P<job_id_java>[^\"\n\s]*)|Created job with id: 
\[(?P<job_id_python>[^\"\n\s]*)\]"
   )
   ```
   
   If that line never appears, `on_new_job_id_callback` never fires and 
`self.dataflow_job_id` stays `None`. There is a second consequence that 
matters: the launcher's stdout-reading loop (`run_beam_command` / `process_fd` 
in `providers/apache/beam/.../hooks/beam.py`) only short-circuits when 
`is_dataflow_job_id_exist_callback()` returns truthy. With no job id ever 
captured that callback stays `False`, so the loop **never returns early** — it 
blocks reading stdout until the subprocess exits, i.e. until the Dataflow job 
itself finishes. In the normal case the scan returns as soon as the id line is 
seen and the remaining wait is delegated to the Dataflow API; here the entire 
job runs inline before either wait mode is reached.
   
   **Why the line is commonly missing.** The Beam SDK logs `Created job with 
id: [...]` from `apache_beam.runners.dataflow.internal.apiclient` at **INFO** 
level. Python's root logger defaults to **WARNING**, so a pipeline only emits 
that line if its own script opts into INFO logging — which the canonical Beam 
`wordcount` example does explicitly in its `__main__`:
   
   ```python
   logging.getLogger().setLevel(logging.INFO)
   ```
   
   Any `DataflowRunner` pipeline whose entrypoint does **not** raise the root 
level to INFO (a common omission) suppresses the job-id line by default, with 
no code change on the Airflow side and no unusual setup. Note that the INFO 
lines that *do* show up in the Airflow task log (`Beam version: ...`, `Running 
command: ...`, `Start waiting ...`) come from Airflow's own 
`self.log.info(...)` in the worker process — not from the launcher subprocess — 
which is why they appear while the subprocess's job-id line does not. A 
newer/different Beam SDK that rewords or relevels the line is a second, 
independent way to miss the regex.
   
   By the time the scan returns, the job has already finished. The two wait 
modes then diverge only in how they handle the (now-terminal) job with a `None` 
id:
   
   - **`deferrable=False`** — the operator calls 
`DataflowHook.wait_for_done(job_id=None, job_name=...)`. With no id, 
`_DataflowJobsController._get_current_jobs` resolves the job by **name prefix** 
(`_fetch_jobs_by_prefix_name`); the job is already `JOB_STATE_DONE`, so 
`wait_for_done` returns after a single lookup (no real polling — there is no 
`Waiting for done. Sleep` iteration). The task **succeeds**.
   - **`deferrable=True`** — the operator defers with 
`job_id=self.dataflow_job_id` (i.e. `None`). The 
`DataflowJobStateCompleteTrigger` (or `DataflowJobStatusTrigger` on older 
google providers) then calls `get_job_status(job_id=None)`; the Dataflow API 
rejects it with `400 Request must contain a job and project id.`; the trigger 
yields an error `TriggerEvent`; and `execute_complete` re-raises it as 
`AirflowException`. The task **fails**.
   
   The deferrable path has no by-name fallback for a missing job id; the sync 
path does. (Note the deferrable path is also strictly worse here: the job has 
already completed before the task defers, so deferring buys nothing and then 
fails on resume.)
   
   Relevant code (paths on `main`):
   
   - `JOB_ID_PATTERN` + `process_line_and_extract_dataflow_job_id_callback` — 
`providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py`
   - launcher loop that only records the id on a regex match (`process_fd` / 
`run_beam_command`) — 
`providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py`
   - `defer(... job_id=self.dataflow_job_id ...)` in `execute_on_dataflow` — 
`providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py`
   - `execute_complete` re-raising the error event — 
`providers/apache/beam/.../operators/beam.py` (`raise 
AirflowException(event["message"])`)
   - sync fallback to name lookup — `DataflowHook.wait_for_done`
   - trigger calling the API with `job_id=None` — 
`providers/google/src/airflow/providers/google/cloud/triggers/dataflow.py`
   
   ### Steps to reproduce
   
   Minimal, deterministic reproduction with a **real** Dataflow job (no mocking 
of Airflow internals). Verified on Airflow 3.1.7 / apache-beam 2.71.0 / 
apache-airflow-providers-google 20.0.0 / apache-airflow-providers-apache-beam 
6.2.3.
   
   1. A no-op Beam pipeline that runs on Dataflow and does **no logging 
configuration** — it leaves the root logger at its default WARNING, so the 
`Created job with id: [...]` INFO line never reaches stdout. The job still 
submits and runs. (This mirrors any pipeline that doesn't opt into INFO 
logging.)
   
      <details><summary>repro_missing_jobid.py (staged in GCS)</summary>
   
      ```python
      # No logging config on purpose: the SDK's "Created job with id: [...]" 
line
      # is INFO, the root logger defaults to WARNING, so the line is filtered 
out.
      import apache_beam as beam
      from apache_beam.options.pipeline_options import PipelineOptions
   
      with beam.Pipeline(options=PipelineOptions()) as p:
          p | "Create" >> beam.Create([1, 2, 3]) | "Noop" >> beam.Map(lambda x: 
x)
      ```
      </details>
   
   2. A DAG with two tasks running that same pipeline on `DataflowRunner`, 
differing only in `deferrable`:
   
      ```python
      from datetime import datetime
   
      from airflow.models import DAG
      from airflow.providers.apache.beam.operators.beam import 
BeamRunPythonPipelineOperator
      from airflow.providers.google.cloud.operators.dataflow import 
DataflowConfiguration
   
      PROJECT = "<your-project>"
      REGION = "<your-region>"  # e.g. europe-west1
   
      def beam_task(task_id, deferrable):
          return BeamRunPythonPipelineOperator(
              task_id=task_id,
              runner="DataflowRunner",
              py_file="gs://<bucket>/repro_missing_jobid.py",
              pipeline_options={
                  "project": PROJECT,
                  "region": REGION,
                  "temp_location": "gs://<bucket>/tmp/",
              },
              dataflow_config=DataflowConfiguration(project_id=PROJECT, 
location=REGION),
              deferrable=deferrable,
          )
   
      with DAG("repro_beam_missing_jobid", start_date=datetime(2026, 1, 1),
               schedule=None, catchup=False):
          beam_task("run_sync", deferrable=False)
          beam_task("run_deferrable", deferrable=True)
      ```
   
   3. Trigger the DAG (a triggerer must be running for the deferrable task to 
resume).
   
   **Result:**
   
   - `run_sync` → **success**.
   - `run_deferrable` → **failed** with `400 Request must contain a job and 
project id.`
   
   ### What you think should happen instead?
   
   At minimum, a deferrable Beam/Dataflow task should not *fail* when the 
launcher stdout lacks the job-id line: the synchronous path already handles 
that exact condition (resolving the finished job by name) and the Dataflow job 
itself runs fine. Flipping `deferrable` from `False` to `True` should never 
turn a passing task into a failing one.
   
   But the deeper issue is that the whole job-id detection mechanism is 
fragile, and it makes a *true* async path impossible whenever the id is absent 
from stdout:
   
   - **Job-id discovery depends on scraping a specific SDK log line.** 
`JOB_ID_PATTERN` matches the exact strings `Created job with id: [...]` 
(Python) / `Submitted job: ...` (Java). That line is emitted at INFO, so it 
only appears when the pipeline opts the root logger into INFO; it also breaks 
on any SDK rewording/relevelling or log reformatting. There is no API-based 
fallback to discover the id of the job that was just launched — so the id is 
silently lost through a perfectly ordinary configuration.
   
   - **Without the id, there is no real async execution at all.** The 
launcher's stdout loop only short-circuits once the id is seen; with no id it 
blocks until the subprocess exits — i.e. it runs the *entire* Dataflow job 
synchronously on the worker. By the time the operator reaches the `deferrable` 
branch the job has already finished, so deferring buys nothing (no worker slot 
is freed during the job) and then fails on resume because it defers with 
`job_id=None`. So the deferrable feature is not merely buggy in this case — it 
is structurally unable to defer until *after* the job id is captured, which is 
exactly what's missing.
   
   A robust fix would decouple job-id resolution from stdout scraping (e.g. 
resolve the just-launched job's id via the Dataflow API by name before 
deferring, as the sync path effectively does), so that the deferrable path can 
defer immediately and the operator no longer depends on a fragile log-line 
match. 
   
   ### Operating System
   
   _No response_
   
   ### Deployment
   
   Google Managed Service for Apache Airflow
   
   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-apache-beam==6.2.3
   apache-airflow-providers-google==20.0.0
   
   ### Official Helm Chart version
   
   Not Applicable
   
   ### Kubernetes Version
   
   _No response_
   
   ### Helm Chart configuration
   
   _No response_
   
   ### Docker Image customizations
   
   _No response_
   
   ### Anything else?
   
   - Occurs **every time** the launcher stdout omits the job-id line; never 
when it contains it. Because emitting the line requires the pipeline to opt 
into INFO logging (the root logger defaults to WARNING), any pipeline that 
doesn't do so fails deterministically in deferrable mode — it is not flaky. 
With the default `append_job_name=True` the job name is unique, so a name-based 
resolution would yield a single job.
   - Background and proposed fixes discussed on [PR 
#67711](https://github.com/apache/airflow/pull/67711).
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to