[ https://issues.apache.org/jira/browse/DRILL-1457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14907249#comment-14907249 ]
Jinfeng Ni commented on DRILL-1457: ----------------------------------- This patch only solves the case where we push limit past UnionExchange. We'll address push limit past other relational operator ( join, project, union, etc) in a separate one. Part of the reason is we may leverage Calcite-831 once it's ready. Further, with limit pushed pass UnionExchange, we did not disable parallel plan. Currently, FragementExecutor would initialize/setup record Reader for all the files / partitions that the table has, which could require large heap memory and long running time. If parallel plan is disabled, the setup for record Reader is processed in one minor fragement, it could slower the query execution. With the limit pushed pass UnionExchange, it could be in same fragment as the scan operator. Before, the scan operator has to rely the query cancel logic to stop reading the data. Now, the minor fragment could stop reading the data once it reach the limit. This is confirmed when I compared with query with limit 1 against tpch sample data. I could see without limit pushdown, each scan could read more data then necessary ( sometimes up to 10 batches). With limit pushdown, each scan only read at most 1 batch for limit 1 query. This seems indicate that we save the I/O cost for scan operator, plus the network cost for sending the data over the wire. One possible further enhancement is to push Limit into scan operator directly, and prune the read entries based on rowcount. This is possible for parquet, as parquet group scan would know the rowcount for each rowgroup. This would be addressed in a separate JIRA. > Limit operator optimization : push limit operator past exchange operator; > disable parallel plan if no order is required. > ------------------------------------------------------------------------------------------------------------------------ > > Key: DRILL-1457 > URL: https://issues.apache.org/jira/browse/DRILL-1457 > Project: Apache Drill > Issue Type: Bug > Reporter: Jinfeng Ni > Assignee: Jinfeng Ni > Priority: Critical > Fix For: 1.2.0 > > Attachments: > 0001-DRILL-1457-Push-Limit-past-through-UnionExchange.patch > > > When there is LIMIT clause in a query, we would want to push down the LIMIT > operator as much as possible, so that the upstream operator will stop > execution once the desired number of rows are fetched. > Within one execution fragment, Drill applies a pull model. In many cases, > there would be no performance impact if LIMIT operator is not pushed down, > since LIMIT would inform the upstream operators to stop. However, in multiple > fragments, Drill use a push model. if LIMIT is not pushed past the exchange > operator, and the upstream fragment would continue the execution, until it > receives a notice from downstream fragment, even if LIMIT operator has > already got the required # of rows. > For instance: > explain plan for select * from > dfs.`/Users/jni/work/tpch-data/tpch-sf10/lineitem` limit 1; > +------------+------------+ > | 00-00 Screen > 00-01 SelectionVectorRemover > 00-02 Limit(fetch=[1]) > 00-03 UnionExchange > 01-01 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath > [path=file:/Users/jni/work/tpch-data/tpch-sf10/lineitem]], > selectionRoot=/Users/jni/work/tpch-data/tpch-sf10/lineitem, > columns=[SchemaPath [`*`]]]]) > The query profile shows Scan operator fetches much more records than desired: > Minor Fragment Start End Total Time Max Records Max > Batches > 01-00-xx 0.507 1.059 0.552 43688 8 > 01-01-xx 0.570 1.054 0.484 27305 5 > 01-02-xx 0.617 1.038 0.421 16383 3 > 01-03-xx 0.668 1.056 0.388 10922 2 > 01-04-xx 0.740 1.055 0.315 10922 2 > 01-05-xx 0.813 1.057 0.244 5461 1 > In the above plan, there would be two choices for performance optimization: > 1) push the LIMIT operator past through EXCHANGE operator, ideally into SCAN > operator. > 2) Disable the parallel plan by removing EXCHANGE operator. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)