dorotaao opened a new issue, #38007:
URL: https://github.com/apache/beam/issues/38007

   ### What happened?
   
   ## Summary
   
   When using `BigQueryIO.read().withMethod(Method.DIRECT_READ)` to read from 
an **empty BigQuery table** or a table that returns zero rows after applying 
`RowRestriction`, the BigQuery Storage API server returns a `ReadSession` with 
**zero streams**.  `BigQueryStorageSourceBase.split()` handles this by 
returning `ImmutableList.of()` (an empty list of sources). 
   While this works correctly for purely bounded reads, it **breaks any 
pipeline that wraps this bounded source into an unbounded one** (via 
`UnboundedReadFromBoundedSource`).
   When `split()` returns an empty list, `UnboundedReadFromBoundedSource` falls 
back to wrapping the original unsplit source directly 
(`ImmutableList.of(boundedSource)`). However, 
`BigQueryStorageSourceBase.createReader()` is not implemented — it 
unconditionally throws `UnsupportedOperationException("BigQuery storage source 
must be split before reading")`, because it is designed to only be read through 
its per-stream sub-sources (`BigQueryStorageStreamSource`). This causes the 
pipeline to get stuck in a loop of exceptions.
   
   ## Environment
   
   - **Apache Beam SDK version:** 2.70.0 (reproduced; likely affects all 
versions with DIRECT_READ support)
   - **Runner:** Dataflow Runner v1
   
   ## Steps to Reproduce
   
   1. Create a Beam pipeline that uses 
`BigQueryIO.read().withMethod(Method.DIRECT_READ)` to read from a BigQuery 
table.
   2. Use this read in a streaming pipeline.
   3. Ensure the target BigQuery table is **empty**, or returns zero rows after 
`RowRestriction` is applied.
   4. Run the pipeline.
   
   ## Observed Behavior
   
   - The pipeline **gets stuck** and the following error appears repeatedly in 
logs:
   
     ```
     Caused by: java.lang.UnsupportedOperationException: BigQuery storage 
source must be split before reading
         
org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageSourceBase.createReader(BigQueryStorageSourceBase.java:200)
         
org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageTableSource.createReader(BigQueryStorageTableSource.java:42)
         
org.apache.beam.sdk.util.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.advance(UnboundedReadFromBoundedSource.java:497)
     ```
   
   ## Expected Behavior
   
   The pipeline does not stall — downstream operations proceed normally, simply 
processing zero elements from this branch.
   
   ## Additional Context
   
   ### Chain of events
   
   1. `BigQueryIO.read().withMethod(Method.DIRECT_READ)` uses 
`BigQueryStorageSourceBase` as the underlying `BoundedSource`.
   2. `split()` builds a `CreateReadSessionRequest` and sends it to the server.
   3. The server returns a `ReadSession` with `streams_count = 0` for an empty 
table.
   4. `split()` returns `ImmutableList.of()`.
   5. `UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter` receives 
an empty list of sub-sources and falls back to wrapping the original unsplit 
source directly (`ImmutableList.of(boundedSource)`). When it calls 
`createReader()` on this `BigQueryStorageSourceBase`, the method throws 
`UnsupportedOperationException("BigQuery storage source must be split before 
reading")` because `BigQueryStorageSourceBase` does not implement 
`createReader()` — it is designed to only be read through its per-stream 
sub-sources.
   6. The job gets stuck in a loop, repeatedly hitting this exception without 
making progress. The watermark never advances and the pipeline must be manually 
cancelled.
   
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [x] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [x] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [x] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to