zhuqi-lucas commented on PR #14418:
URL: https://github.com/apache/datafusion/pull/14418#issuecomment-2631276433

   @adriangb This only happen when sort with limit, because:
   
   The following logic:
   ```
    // If we have a non-limit operator with fetch capability, update global
       // state as necessary:
       if pushdown_plan.fetch().is_some() {
           if global_state.fetch.is_none() {
               global_state.satisfied = true;
           }
           (global_state.skip, global_state.fetch) = combine_limit(
               global_state.skip,
               global_state.fetch,
               0,
               pushdown_plan.fetch(),
           );
       }
   ```
   
   When sort with limit, the following steps cause the bug: 
   1. global_state.satisfied = true
   2. We will remove the global limit from the original logic without this PR.
   3. when we go to the final decision, we will not add back the global limit 
before CoalescePartitionsExec because the global_state.satisfied  already 
setting to true.
   
   ```rust
         if pushdown_plan.supports_limit_pushdown() {
           if !combines_input_partitions(&pushdown_plan) {
               // We have information in the global state and the plan pushes 
down,
               // continue:
               Ok((Transformed::no(pushdown_plan), global_state))
           } else if let Some(plan_with_fetch) = 
pushdown_plan.with_fetch(skip_and_fetch) {
               // This plan is combining input partitions, so we need to add the
               // fetch info to plan if possible. If not, we must add a 
`LimitExec`
               // with the information from the global state.
               let mut new_plan = plan_with_fetch;
               // Execution plans can't (yet) handle skip, so if we have one,
               // we still need to add a global limit
               if global_state.skip > 0 {
                   new_plan =
                       add_global_limit(new_plan, global_state.skip, 
global_state.fetch);
               }
               global_state.fetch = skip_and_fetch;
               global_state.skip = 0;
               global_state.satisfied = true;
               Ok((Transformed::yes(new_plan), global_state))
           } **else if global_state.satisfied {
               // If the plan is already satisfied, do not add a limit:
               Ok((Transformed::no(pushdown_plan), global_state))
           }** else {
               global_state.satisfied = true;
               Ok((
                   Transformed::yes(add_limit(
                       pushdown_plan,
                       global_state.skip,
                       global_fetch,
                   )),
                   global_state,
               ))
           }
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to