zhuqi-lucas commented on code in PR #14418:
URL: https://github.com/apache/datafusion/pull/14418#discussion_r1940594604


##########
datafusion/physical-optimizer/src/limit_pushdown.rs:
##########
@@ -146,6 +146,15 @@ pub fn pushdown_limit_helper(
         global_state.skip = skip;
         global_state.fetch = fetch;
 
+        if limit_exec.input().as_any().is::<CoalescePartitionsExec>() {

Review Comment:
   Thank you @mertak-synnada for review:
   
   > While I agree with checking via API suggestion, please also check with the 
combines_input_partitions() helper function so that SortPreservingMerge can be 
affected as well.
   
   I agree, i checked the SortPreservingMergeExec already, i supported 
with_fetch() and fetch(), so it's not affected i think?
   
   ```rust
   impl SortPreservingMergeExec {
       /// Create a new sort execution plan
       pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
           let cache = Self::compute_properties(&input, expr.clone());
           Self {
               input,
               expr,
               metrics: ExecutionPlanMetricsSet::new(),
               fetch: None,
               cache,
               enable_round_robin_repartition: true,
           }
       }
   
       /// Sets the number of rows to fetch
       pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
           self.fetch = fetch;
           self
       }
   
       /// Sets the selection strategy of tied winners of the loser tree 
algorithm
       ///
       /// If true (the default) equal output rows are placed in the merged 
stream
       /// in round robin fashion. This approach consumes input streams at more
       /// even rates when there are many rows with the same sort key.
       ///
       /// If false, equal output rows are always placed in the merged stream in
       /// the order of the inputs, resulting in potentially slower execution 
but a
       /// stable output order.
       pub fn with_round_robin_repartition(
           mut self,
           enable_round_robin_repartition: bool,
       ) -> Self {
           self.enable_round_robin_repartition = enable_round_robin_repartition;
           self
       }
   
       /// Input schema
       pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
           &self.input
       }
   
       /// Sort expressions
       pub fn expr(&self) -> &LexOrdering {
           self.expr.as_ref()
       }
   
       /// Fetch
       pub fn fetch(&self) -> Option<usize> {
           self.fetch
       }
   
       /// Creates the cache object that stores the plan properties
       /// such as schema, equivalence properties, ordering, partitioning, etc.
       fn compute_properties(
           input: &Arc<dyn ExecutionPlan>,
           ordering: LexOrdering,
       ) -> PlanProperties {
           let mut eq_properties = input.equivalence_properties().clone();
           eq_properties.clear_per_partition_constants();
           eq_properties.add_new_orderings(vec![ordering]);
           PlanProperties::new(
               eq_properties,                        // Equivalence Properties
               Partitioning::UnknownPartitioning(1), // Output Partitioning
               input.pipeline_behavior(),            // Pipeline Behavior
               input.boundedness(),                  // Boundedness
           )
       }
   }
   ```
   
   > But in the end, I think rather than adding a global limit, we should be 
able to limit in the CoalescePartitionsExec or in SortPreservingMerge so that 
it won't unnecessarily push more data.
   
   I totally agree this! So i created a follow-up 
https://github.com/apache/datafusion/issues/14446 to support limit in the 
CoalescePartitionsExec, SortPreservingMerge already supported this according 
above code.
   
   
   > So if the plan is combining input partitions, we're only adding a global 
limit if skip information is there, maybe we can identify if the local limits 
are enough or not and then decide to add the global limit at there. 
   
   This is a good point, we can create another issue to try to improve this!
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to