[ 
https://issues.apache.org/jira/browse/DRILL-1457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14908869#comment-14908869
 ] 

ASF GitHub Bot commented on DRILL-1457:
---------------------------------------

Github user jinfengni commented on a diff in the pull request:

    https://github.com/apache/drill/pull/169#discussion_r40484523
  
    --- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
 ---
    @@ -26,4 +30,34 @@
       public void testLimitWithExchanges() throws Exception{
         testPhysicalFromFile("limit/limit_exchanges.json");
       }
    +
    +  @Test
    +  public void testPushLimitPastUnionExchange() throws Exception {
    +    // Push limit past through UnionExchange.
    +    final String WORKING_PATH = TestTools.getWorkingPath();
    +    final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
    +
    +    try {
    +      // case 1. single table query.
    +      final String sql = String.format("select * from 
dfs_test.`%s/tpchmulti/region` limit 1 offset 2", TEST_RES_PATH);
    +      test("alter session set `planner.slice_target` = 1");
    +      // test(sql);
    +
    +      // Validate the plan
    +      final String[] expectedPlan = 
{"(?s)Limit.*UnionExchange.*Limit.*Scan"};
    +      final String[] excludedPatterns = {};
    +      PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, 
excludedPatterns);
    +
    +      // case 2: join query.
    +      final String sql2 = String.format("select * from 
dfs_test.`%s/tpchmulti/region` r,  dfs_test.`%s/tpchmulti/nation` n where 
r.r_regionkey = n.n_regionkey limit 1 offset 2", TEST_RES_PATH, TEST_RES_PATH );
    +      // Validate the plan
    +      final String[] expectedPlan2 = 
{"(?s)Limit.*UnionExchange.*Limit.*Join"};
    +      final String[] excludedPatterns2 = {};
    +      PlanTestBase.testPlanMatchingPatterns(sql2, expectedPlan2, 
excludedPatterns2);
    +    } finally {
    +      test("alter session set `planner.slice_target` = " + 
ExecConstants.SLICE_TARGET_OPTION.getDefault().getValue());
    +    }
    +    // test(sql2);
    --- End diff --
    
    Removed that. 


> Limit operator optimization : push limit operator past exchange operator; 
> disable parallel plan if no order is required.
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: DRILL-1457
>                 URL: https://issues.apache.org/jira/browse/DRILL-1457
>             Project: Apache Drill
>          Issue Type: Bug
>            Reporter: Jinfeng Ni
>            Assignee: Jinfeng Ni
>            Priority: Critical
>             Fix For: 1.2.0
>
>         Attachments: 
> 0001-DRILL-1457-Push-Limit-past-through-UnionExchange.patch
>
>
> When there is LIMIT clause in a query, we would want to push down the LIMIT 
> operator as much as possible, so that the upstream operator will stop 
> execution once the desired number of rows are fetched.
> Within one execution fragment, Drill applies a pull model. In many cases, 
> there would be no performance impact if LIMIT operator is not pushed down, 
> since LIMIT would inform the upstream operators to stop. However, in multiple 
> fragments, Drill use a push model.  if LIMIT is not pushed past the exchange 
> operator, and the upstream fragment would continue the execution, until it 
> receives a notice from downstream fragment, even if LIMIT operator has 
> already got the required # of rows.
> For instance:
> explain plan for select * from 
> dfs.`/Users/jni/work/tpch-data/tpch-sf10/lineitem` limit 1;
> +------------+------------+
> | 00-00    Screen
> 00-01      SelectionVectorRemover
> 00-02        Limit(fetch=[1])
> 00-03          UnionExchange
> 01-01            Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath 
> [path=file:/Users/jni/work/tpch-data/tpch-sf10/lineitem]], 
> selectionRoot=/Users/jni/work/tpch-data/tpch-sf10/lineitem, 
> columns=[SchemaPath [`*`]]]])
> The query profile shows Scan operator fetches much more records than desired:
> Minor Fragment        Start   End     Total Time      Max Records     Max 
> Batches
> 01-00-xx      0.507   1.059   0.552   43688   8
> 01-01-xx      0.570   1.054   0.484   27305   5
> 01-02-xx      0.617   1.038   0.421   16383   3
> 01-03-xx      0.668   1.056   0.388   10922   2
> 01-04-xx      0.740   1.055   0.315   10922   2
> 01-05-xx      0.813   1.057   0.244   5461    1
> In the above plan,  there would be two choices for performance optimization:
> 1) push the LIMIT operator past through EXCHANGE operator, ideally into SCAN 
> operator. 
> 2) Disable the parallel plan by removing EXCHANGE operator.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to