zhuqi-lucas commented on code in PR #19064:
URL: https://github.com/apache/datafusion/pull/19064#discussion_r2616869240
##########
datafusion/physical-plan/src/coalesce_partitions.rs:
##########
@@ -284,6 +286,47 @@ impl ExecutionPlan for CoalescePartitionsExec {
) -> Result<FilterDescription> {
FilterDescription::from_children(parent_filters, &self.children())
}
+
+ fn try_pushdown_sort(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
+ // CoalescePartitionsExec merges multiple partitions into one, which
loses
+ // global ordering. However, we can still push the sort requirement
down
+ // to optimize individual partitions - the Sort operator above will
handle
+ // the global ordering.
+ //
+ // Note: The result will always be at most Inexact (never Exact) when
there
+ // are multiple partitions, because merging destroys global ordering.
+ let result = self.input.try_pushdown_sort(order)?;
+
+ // If we have multiple partitions, we can't return Exact even if the
+ // underlying source claims Exact - merging destroys global ordering
+ let has_multiple_partitions =
+ self.input.output_partitioning().partition_count() > 1;
+
+ result
+ .try_map(|new_input| {
+ Ok(
+ Arc::new(
+
CoalescePartitionsExec::new(new_input).with_fetch(self.fetch),
+ ) as Arc<dyn ExecutionPlan>,
+ )
+ })
+ .map(|r| {
+ if has_multiple_partitions {
+ // Downgrade Exact to Inexact when merging multiple
partitions
Review Comment:
Great suggestion @alamb , i will address this!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]