sumedhsakdeo opened a new pull request, #3046:
URL: https://github.com/apache/iceberg-python/pull/3046

   # Rationale for this change
   
   ## Summary
   
     Addresses #3036 — ArrowScan.to_record_batches() uses executor.map + list() 
which eagerly materializes all record batches per file into memory, causing OOM 
on large tables.
   
     This PR adds three parameters to to_arrow_batch_reader() that give users 
control over memory usage and parallelism:
   
     - batch_size — Controls the number of rows per batch passed to PyArrow's 
ds.Scanner. Default is PyArrow's built-in 131,072 rows.
     - streaming — When True, batches are yielded as they are produced without 
materializing entire files into memory. Uses a bounded queue with backpressure 
instead of executor.map + list().
     - concurrent_files — Number of files to read concurrently when 
streaming=True. A semaphore limits active file readers, and a bounded queue 
(max 16 batches) provides backpressure to cap memory usage.
   
   ## Problem
   
     The current implementation materializes all batches from each file via 
list() inside executor.map, which runs up to min(32, cpu_count+4) files in 
parallel. For large files this means all batches from ~20 files are held in 
memory simultaneously
      before any are yielded to the consumer.
   
   ## Solution
   
     ### Before: OOM on large tables
    ```python
     batches = table.scan().to_arrow_batch_reader()
     ```
   
     ### After: bounded memory, tunable parallelism
     ```python
     batches = table.scan().to_arrow_batch_reader(
         streaming=True,
         concurrent_files=4,
         batch_size=10000,
     )
     ```
   
     Default behavior is unchanged — `streaming=False` preserves the existing 
executor.map + list() path for backwards compatibility.
   
   ## Architecture
   
     When `streaming=True`, batches flow through _bounded_concurrent_batches:
   
     1. All file tasks are submitted to the shared thread pool
     2. A Semaphore(concurrent_files) limits how many files are read 
simultaneously
     3. Workers push batches into a bounded Queue(maxsize=16) — when full, 
workers block (backpressure)
     4. The consumer yields batches from the queue via blocking queue.get()
     5. A sentinel value signals completion — no timeout-based polling
     6. On early termination (consumer stops), extra semaphore permits are 
released to unblock waiting workers, and the queue is drained
   
   ## Ordering semantics:
   
     | Configuration              | File ordering                               
     | Within-file ordering       |
     
|----------------------------|--------------------------------------------------|----------------------------|
     | Default (`streaming=False`) | Batches grouped by file, in task 
submission order | Row order                  |
     | `streaming=True`            | Interleaved across files (no grouping 
guarantee)  | Row order within each file |
   
   ## PR Stack
   Breakdown of this large PR into smaller PRs:
    1. **[PR 0](https://github.com/sumedhsakdeo/iceberg-python/pull/3)**: 
`batch_size` forwarding
    2. **[PR 1](https://github.com/sumedhsakdeo/iceberg-python/pull/1)**: 
`streaming` flag — stop materializing entire files
     3. **[PR 2](https://github.com/sumedhsakdeo/iceberg-python/pull/2)**: 
`concurrent_files` — bounded concurrent streaming
     4.  **[PR 3](https://github.com/sumedhsakdeo/iceberg-python/pull/4)**: 
`benchmark`
        
   
   ## Benchmark results
   
    32 files x 500K rows, 5 columns (int64, float64, string, bool, timestamp), 
batch_size=131,072 (PyArrow default):
   
     | Config                                  | Throughput   | Peak Arrow 
Memory |
     
|-----------------------------------------|--------------|-------------------|
     | Default (`executor.map` + `list()`)     | 212M rows/s  | 635 MB          
  |
     | `streaming=True, concurrent_files=1`    | 61M rows/s   | 10 MB           
  |
     | `streaming=True, concurrent_files=2`    | 111M rows/s  | 42 MB           
  |
     | `streaming=True, concurrent_files=4`    | 182M rows/s  | 111 MB          
  |
     | `streaming=True, concurrent_files=8`    | 227M rows/s  | 251 MB          
  |
     | `streaming=True, concurrent_files=16`   | 218M rows/s  | 457 MB          
  |
     Positional deletes, row filters, and limit are handled correctly in all 
modes.
   
   ## Are these changes tested?
   
   Yes. 23 new unit tests across two test files, plus a micro-benchmark:
   
     - tests/io/test_pyarrow.py (14 tests): batch_size controls rows per batch, 
streaming yields all rows correctly, streaming respects limit, within-file 
ordering preserved, positional deletes applied correctly in all three modes 
(default,
     streaming, concurrent), positional deletes with limit, concurrent_files < 
1 raises ValueError
     - tests/io/test_bounded_concurrent_batches.py (9 tests): single/multi-file 
correctness, incremental streaming, backpressure blocks producers when queue is 
full, error propagation from workers to consumer, early termination cancels 
workers
     cleanly, concurrency limit enforced, empty task list, ArrowScan 
integration with limit
     - tests/benchmark/test_read_benchmark.py: read throughput micro-benchmark 
across 6 configurations measuring rows/sec and peak Arrow memory
   
   ## Are there any user-facing changes?
   
    Yes. Three new optional parameters on DataScan.to_arrow_batch_reader():
   
     - batch_size: int | None — number of rows per batch (default: PyArrow's 
131,072)
     - streaming: bool — yield batches without materializing entire files 
(default: False)
     - concurrent_files: int — number of files to read concurrently when 
streaming (default: 1)
   
     All parameters are optional with backwards-compatible defaults. Existing 
code is unaffected.
   
     Documentation updated in mkdocs/docs/api.md with usage examples and 
ordering semantics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to