zhuqi-lucas commented on code in PR #8146:
URL: https://github.com/apache/arrow-rs/pull/8146#discussion_r2280889409
##########
arrow-select/src/coalesce.rs:
##########
@@ -236,6 +262,13 @@ impl BatchCoalescer {
/// assert_eq!(completed_batch, expected_batch);
/// ```
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError>
{
+ if let Some(limit) = self.biggest_coalesce_batch_size {
Review Comment:
Current behaviour for this PR:
```rust
// Large batch bypass optimization:
// When biggest_coalesce_batch_size is configured and a batch
exceeds this limit,
// we can avoid expensive split-and-merge operations by passing it
through directly.
//
// IMPORTANT: This optimization is OPTIONAL and only active when
biggest_coalesce_batch_size
// is explicitly set via
with_biggest_coalesce_batch_size(Some(limit)).
// If not set (None), ALL batches follow normal coalescing behavior
regardless of size.
//
=============================================================================
// CASE 1: No buffer + large batch → Direct bypass
//
=============================================================================
// Example scenario (target_batch_size=1000,
biggest_coalesce_batch_size=Some(500)):
// Input sequence: [600, 1200, 300]
//
// With biggest_coalesce_batch_size=Some(500) (optimization enabled):
// 600 → large batch detected! buffered_rows=0 → Case 1: direct
bypass
// → output: [600] (bypass, preserves large batch)
// 1200 → large batch detected! buffered_rows=0 → Case 1: direct
bypass
// → output: [1200] (bypass, preserves large batch)
// 300 → normal batch, buffer: [300]
// Result: [600], [1200], [300] - large batches preserved, mixed
sizes
//
=============================================================================
// CASE 2: Buffer too large + large batch → Flush first, then bypass
//
=============================================================================
// This case prevents creating extremely large merged batches that
would
// significantly exceed both target_batch_size and
biggest_coalesce_batch_size.
//
// Example 1: Buffer exceeds limit before large batch arrives
// target_batch_size=1000, biggest_coalesce_batch_size=Some(400)
// Input: [350, 200, 800]
//
// Step 1: push_batch([350])
// → batch_size=350 <= 400, normal path
// → buffer: [350], buffered_rows=350
//
// Step 2: push_batch([200])
// → batch_size=200 <= 400, normal path
// → buffer: [350, 200], buffered_rows=550
//
// Step 3: push_batch([800])
// → batch_size=800 > 400, large batch path
// → buffered_rows=550 > 400 → Case 2: flush first
// → flush: output [550] (combined [350, 200])
// → then bypass: output [800]
// Result: [550], [800] - buffer flushed to prevent oversized merge
//
// Example 2: Multiple small batches accumulate before large batch
// target_batch_size=1000, biggest_coalesce_batch_size=Some(300)
// Input: [150, 100, 80, 900]
//
// Step 1-3: Accumulate small batches
// 150 → buffer: [150], buffered_rows=150
// 100 → buffer: [150, 100], buffered_rows=250
// 80 → buffer: [150, 100, 80], buffered_rows=330
//
// Step 4: push_batch([900])
// → batch_size=900 > 300, large batch path
// → buffered_rows=330 > 300 → Case 2: flush first
// → flush: output [330] (combined [150, 100, 80])
// → then bypass: output [900]
// Result: [330], [900] - prevents merge into [1230] which would
be too large
//
=============================================================================
// CASE 3: Small buffer + large batch → Normal coalescing (no bypass)
//
=============================================================================
// When buffer is small enough, we still merge to maintain efficiency
// Example: target_batch_size=1000,
biggest_coalesce_batch_size=Some(500)
// Input: [300, 1200]
//
// Step 1: push_batch([300])
// → batch_size=300 <= 500, normal path
// → buffer: [300], buffered_rows=300
//
// Step 2: push_batch([1200])
// → batch_size=1200 > 500, large batch path
// → buffered_rows=300 <= 500 → Case 3: normal merge
// → buffer: [300, 1200] (1500 total)
// → 1500 > target_batch_size → split: output [1000], buffer [500]
// Result: [1000], [500] - normal split/merge behavior maintained
//
=============================================================================
// Comparison: Default vs Optimized Behavior
//
=============================================================================
// target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
// Input: [600, 1200, 300]
//
// DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None):
// 600 → buffer: [600]
// 1200 → buffer: [600, 1200] (1800 rows total)
// → split: output [1000 rows], buffer [800 rows remaining]
// 300 → buffer: [800, 300] (1100 rows total)
// → split: output [1000 rows], buffer [100 rows remaining]
// Result: [1000], [1000], [100] - all outputs respect
target_batch_size
//
// OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)):
// 600 → Case 1: direct bypass → output: [600]
// 1200 → Case 1: direct bypass → output: [1200]
// 300 → normal path → buffer: [300]
// Result: [600], [1200], [300] - large batches preserved
//
=============================================================================
// Benefits and Trade-offs
//
=============================================================================
// Benefits of the optimization:
// - Large batches stay intact (better for downstream vectorized
processing)
// - Fewer split/merge operations (better CPU performance)
// - More predictable memory usage patterns
// - Maintains streaming efficiency while preserving batch boundaries
//
// Trade-offs:
// - Output batch sizes become variable (not always
target_batch_size)
// - May produce smaller partial batches when flushing before large
batches
// - Requires tuning biggest_coalesce_batch_size parameter for
optimal performance
// TODO, for unsorted batches, we may can filter all large batches,
and coalesce all
// small batches together?
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]