sumedhsakdeo commented on code in PR #3046:
URL: https://github.com/apache/iceberg-python/pull/3046#discussion_r2867118778


##########
pyiceberg/io/pyarrow.py:
##########
@@ -1789,54 +1844,115 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> 
pa.Table:
 
         return result
 
-    def to_record_batches(self, tasks: Iterable[FileScanTask]) -> 
Iterator[pa.RecordBatch]:
+    def to_record_batches(
+        self,
+        tasks: Iterable[FileScanTask],
+        order: ScanOrder = _DEFAULT_SCAN_ORDER,
+    ) -> Iterator[pa.RecordBatch]:
         """Scan the Iceberg table and return an Iterator[pa.RecordBatch].
 
         Returns an Iterator of pa.RecordBatch with data from the Iceberg table
         by resolving the right columns that match the current table schema.
         Only data that matches the provided row_filter expression is returned.
 
+        Ordering semantics:
+            - TaskOrder() (default): Yields batches one file at a time in task 
submission order.
+            - ArrivalOrder(): Batches may be interleaved across files as they 
arrive.
+              Within each file, batch ordering follows row order.
+
         Args:
             tasks: FileScanTasks representing the data files and delete files 
to read from.
+            order: Controls the order in which record batches are returned.
+                TaskOrder() (default) yields batches one file at a time in 
task order.
+                ArrivalOrder(concurrent_streams=N, batch_size=B, 
max_buffered_batches=M)
+                yields batches as they are produced without materializing 
entire files
+                into memory. Peak memory ≈ concurrent_streams × batch_size × 
max_buffered_batches
+                × (average row size in bytes). batch_size is the number of 
rows per batch.
+                For example (if average row size ≈ 32 bytes):
+                - ArrivalOrder(concurrent_streams=4, batch_size=32768, 
max_buffered_batches=8)
+                - Peak memory ≈ 4 × 32768 rows × 8 × 32 bytes ≈ ~32 MB (plus 
Arrow overhead)
 
         Returns:
             An Iterator of PyArrow RecordBatches.
             Total number of rows will be capped if specified.
 
         Raises:
             ResolveError: When a required field cannot be found in the file
-            ValueError: When a field type in the file cannot be projected to 
the schema type
+            ValueError: When a field type in the file cannot be projected to 
the schema type,
+                or when an invalid order value is provided, or when 
concurrent_streams < 1.
         """
-        deletes_per_file = _read_all_delete_files(self._io, tasks)
+        if not isinstance(order, ScanOrder):
+            raise ValueError(f"Invalid order: {order!r}. Must be a ScanOrder 
instance (TaskOrder() or ArrivalOrder()).")
 
-        total_row_count = 0
+        task_list, deletes_per_file = self._prepare_tasks_and_deletes(tasks)
+
+        if isinstance(order, ArrivalOrder):
+            if order.concurrent_streams < 1:
+                raise ValueError(f"concurrent_streams must be >= 1, got 
{order.concurrent_streams}")
+            return self._apply_limit(
+                self._iter_batches_arrival(
+                    task_list, deletes_per_file, order.batch_size, 
order.concurrent_streams, order.max_buffered_batches
+                )
+            )
+
+        return self._apply_limit(self._iter_batches_materialized(task_list, 
deletes_per_file))
+
+    def _prepare_tasks_and_deletes(
+        self, tasks: Iterable[FileScanTask]
+    ) -> tuple[list[FileScanTask], dict[str, list[ChunkedArray]]]:
+        """Resolve delete files and return tasks as a list."""
+        task_list = list(tasks)
+        deletes_per_file = _read_all_delete_files(self._io, task_list)
+        return task_list, deletes_per_file
+
+    def _iter_batches_arrival(
+        self,
+        task_list: list[FileScanTask],
+        deletes_per_file: dict[str, list[ChunkedArray]],
+        batch_size: int | None,
+        concurrent_streams: int,
+        max_buffered_batches: int = 16,
+    ) -> Iterator[pa.RecordBatch]:
+        """Yield batches using bounded concurrent streaming in arrival 
order."""
+
+        def batch_fn(task: FileScanTask) -> Iterator[pa.RecordBatch]:
+            return self._record_batches_from_scan_tasks_and_deletes([task], 
deletes_per_file, batch_size)
+
+        yield from _bounded_concurrent_batches(task_list, batch_fn, 
concurrent_streams, max_buffered_batches)
+
+    def _iter_batches_materialized(
+        self,
+        task_list: list[FileScanTask],
+        deletes_per_file: dict[str, list[ChunkedArray]],
+    ) -> Iterator[pa.RecordBatch]:
+        """Yield batches using executor.map with full file materialization."""
         executor = ExecutorFactory.get_or_create()
 
         def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]:
-            # Materialize the iterator here to ensure execution happens within 
the executor.
-            # Otherwise, the iterator would be lazily consumed later (in the 
main thread),
-            # defeating the purpose of using executor.map.
             return 
list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
 
-        limit_reached = False
-        for batches in executor.map(batches_for_task, tasks):
-            for batch in batches:
-                current_batch_size = len(batch)
-                if self._limit is not None and total_row_count + 
current_batch_size >= self._limit:
-                    yield batch.slice(0, self._limit - total_row_count)
+        for batches in executor.map(batches_for_task, task_list):
+            yield from batches
 
-                    limit_reached = True
-                    break
-                else:
-                    yield batch
-                    total_row_count += current_batch_size
+    def _apply_limit(self, batches: Iterator[pa.RecordBatch]) -> 
Iterator[pa.RecordBatch]:

Review Comment:
   Good catch. Correctness is already guaranteed — `_apply_limit` is the final 
gate before rows reach the caller, so users always see exactly `limit` rows 
regardless.
   
   The only cost is wasted upstream work: each worker applies `self._limit` 
against its own local `total_row_count` starting from 0, so with 
`concurrent_streams=N` and `limit=L`, workers can read up to `N × L` rows from 
disk before cancel propagates. In practice this is bounded quickly by the queue 
backpressure (`max_buffered_batches=16`) — workers producing beyond what the 
consumer can buffer immediately block on `put()` and get cancelled when 
`_apply_limit` stops consuming.
   
   Given the correctness guarantee and the natural backpressure bound on wasted 
work, deferring the shared counter to a follow-up PR to keep this PR contained. 
Happy to track it as a follow-up if that works for you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to