Dandandan commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2809508850
So to change it in your diff (didn't change the documentation).
I would like to keep the original `StreamingMergeBuilder` case and the `if
self.reservation.size() < self.sort_in_place_threshold_bytes` expression so
that we only have the "avoid concat for non-sort columns" optimization in place
and see if this improves on all sort queries.
```diff
- // If less than sort_in_place_threshold_bytes, concatenate and sort
in place
- if self.reservation.size() < self.sort_in_place_threshold_bytes {
- // Concatenate memory batches together and sort
- let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
- self.in_mem_batches.clear();
- self.reservation
- .try_resize(get_reserved_byte_for_record_batch(&batch))
- .map_err(Self::err_with_oom_context)?;
- let reservation = self.reservation.take();
- return self.sort_batch_stream(batch, metrics, reservation);
+ let mut columns_by_expr: Vec<Vec<ArrayRef>> = vec![vec![];
self.expr.len()];
+ for batch in &self.in_mem_batches {
+ for (i, expr) in self.expr.iter().enumerate() {
+ let col = expr.evaluate_to_sort_column(batch)?.values;
+ columns_by_expr[i].push(col);
+ }
}
- let streams = std::mem::take(&mut self.in_mem_batches)
- .into_iter()
- .map(|batch| {
- let metrics = self.metrics.baseline.intermediate();
- let reservation = self
- .reservation
- .split(get_reserved_byte_for_record_batch(&batch));
- let input = self.sort_batch_stream(batch, metrics,
reservation)?;
- Ok(spawn_buffered(input, 1))
- })
- .collect::<Result<_>>()?;
+ // For each sort expression, concatenate arrays from all batches
into one global array
+ let mut sort_columns = Vec::with_capacity(self.expr.len());
+ for (arrays, expr) in
columns_by_expr.into_iter().zip(self.expr.iter()) {
+ let array = concat(
+ &arrays
+ .iter()
+ .map(|a| a.as_ref())
+ .collect::<Vec<&dyn Array>>(),
+ )?;
+ sort_columns.push(SortColumn {
+ values: array,
+ options: expr.options.into(),
+ });
+ }
- let expressions: LexOrdering = self.expr.iter().cloned().collect();
+ // ===== Phase 2: Compute global sorted indices =====
+ // Use `lexsort_to_indices` to get global row indices in sorted
order (as if all batches were concatenated)
+ let indices = lexsort_to_indices(&sort_columns, None)?;
- StreamingMergeBuilder::new()
- .with_streams(streams)
- .with_schema(Arc::clone(&self.schema))
- .with_expressions(expressions.as_ref())
- .with_metrics(metrics)
- .with_batch_size(self.batch_size)
- .with_fetch(None)
- .with_reservation(self.merge_reservation.new_empty())
- .build()
+ // ===== Phase 3: Reorder each column using the global sorted
indices =====
+ let num_columns = self.schema.fields().len();
+
+ let batch_indices: Vec<(usize, usize)> = self
+ .in_mem_batches
+ .iter()
+ .enumerate()
+ .map(|(batch_id, batch)| (0..batch.num_rows()).map(move |i|
(batch_id, i)))
+ .flatten()
+ .collect();
+
+ // For each column:
+ // 1. Concatenate all batch arrays for this column (in the same
order as assumed by `lexsort_to_indices`)
+ // 2. Use Arrow's `take` function to reorder the column by sorted
indices
+ let interleave_indices: Vec<(usize, usize)> = indices
+ .values()
+ .iter()
+ .map(|x| batch_indices[*x as usize])
+ .collect();
+ // Build a RecordBatch from the sorted columns
+
+ let batches: Vec<&RecordBatch> =
self.in_mem_batches.iter().collect();
+
+ let sorted_batch =
+ interleave_record_batch(batches.as_ref(), &interleave_indices)?;
+ // Clear in-memory batches and update reservation
+ self.in_mem_batches.clear();
+ self.reservation
+ .try_resize(get_reserved_byte_for_record_batch(&sorted_batch))?;
+ let reservation = self.reservation.take();
+
+ // ===== Phase 4: Construct the resulting stream =====
+ let stream = futures::stream::once(async move {
+ let _timer = metrics.elapsed_compute().timer();
+ metrics.record_output(sorted_batch.num_rows());
+ drop(reservation);
+ Ok(sorted_batch)
+ });
+
+ Ok(Box::pin(RecordBatchStreamAdapter::new(
+ self.schema.clone(),
+ stream,
+ )))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]