kosiew commented on code in PR #20494:
URL: https://github.com/apache/datafusion/pull/20494#discussion_r2945072328


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -884,6 +924,56 @@ impl TopKHeap {
     }
 }
 
+enum InterleaveError {
+    Overflow(String),
+    DataFusion(DataFusionError),
+}
+
+fn try_interleave_record_batch(

Review Comment:
   The new retry path decides whether an `interleave_record_batch` failure is 
recoverable by checking whether the panic or error text contains `"overflow"`.
   This is too broad for a generic `catch_unwind` around every Arrow interleave 
panic.
   
   It could also catch unrelated failures, such as other overflow-style panics 
(`view buffer index overflow`, future internal overflow checks, and similar 
cases).
   Those would then be treated as large-string conditions and retried with 
smaller chunks, instead of surfacing the real bug.
   
   Please narrow this to the exact offset-overflow condition we expect here.
   Better yet, avoid panic-message matching entirely and choose chunk sizes 
from explicit byte accounting for variable-width arrays.
   



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -748,47 +735,92 @@ impl TopKHeap {
     }
 
     /// Returns the values stored in this heap, from values low to
-    /// high, as a single [`RecordBatch`], resetting the inner heap
-    pub fn emit(&mut self) -> Result<Option<RecordBatch>> {
+    /// high, as [`RecordBatch`]es, resetting the inner heap
+    pub fn emit(&mut self) -> Result<Vec<RecordBatch>> {
         Ok(self.emit_with_state()?.0)
     }
 
     /// Returns the values stored in this heap, from values low to
-    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// high, as [`RecordBatch`]es, and a sorted vec of the
     /// current heap's contents
-    pub fn emit_with_state(&mut self) -> Result<(Option<RecordBatch>, 
Vec<TopKRow>)> {
+    pub fn emit_with_state(&mut self) -> Result<(Vec<RecordBatch>, 
Vec<TopKRow>)> {
         // generate sorted rows
         let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec();
 
         if self.store.is_empty() {
-            return Ok((None, topk_rows));
+            return Ok((Vec::new(), topk_rows));
         }
 
-        // Collect the batches into a vec and store the "batch_id -> 
array_pos" mapping, to then
-        // build the `indices` vec below. This is needed since the batch ids 
are not continuous.
+        let batches = self.interleave_topk_rows(&topk_rows, self.batch_size)?;
+
+        Ok((batches, topk_rows))
+    }
+
+    fn interleave_topk_rows(

Review Comment:
    `interleave_topk_rows` is now doing three things: collecting batch/index 
metadata, choosing chunk sizes, and translating Arrow failures into retry 
behavior. 
   
   A small helper split here would make the control flow easier to scan and 
would also make the retry policy easier to test independently.



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -1110,6 +1200,56 @@ mod tests {
         assert_eq!(record_batch_store.batches_size, 0);
     }
 
+    #[test]
+    fn test_topk_heap_emit_with_state_respects_batch_size() -> Result<()> {

Review Comment:
   The added test verifies the new output batching shape, which is useful.
   But it does not exercise the actual overflow-retry path.
   
   Since the fix depends on `catch_unwind` plus retry, it would be good to add 
a focused regression test for that control flow.
   One way to make that practical would be to extract the interleave attempt 
behind a small helper.
   Then the test could inject a synthetic `"offset overflow"` failure without 
allocating multi-gigabyte strings.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to