[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16026023#comment-16026023 ]
liyunzhang_intel commented on HIVE-16600: ----------------------------------------- [~lirui]: thanks for your algorithm. I change the algorithm by following {noformat} if MultiInsert jointOperator=getJointOperator #jointOperator is the operator where the branches start. orderByLimit(isMultiInsert, jointOperator) /**Judge orderByLimit case: * non multi-insert: If there is a Limit in the path which is from reduceSink to next RS/FS, return true otherwise false * multi-insert: If there is a Limit in the path which is from reduceSink to joinOperator, return true otherwise false */ isOrderByLimit(isMultiInsert, jointOperator){} {noformat} the bug you found in HIVE-16600.8.patch is like {noformat} TS[0]-SEL[1]-RS[2]-SEL[3]-LIM[4]-RS[6]-FOR[7]-GBY[8]-SEL[9]-FS[11] -GBY[12]-SEL[13]-FS[15] it is a multi-insert case and an order by case(LIMIT[4] after RS[2]) and should not enable parallel order by. Let me explain it in above algorithm. first calculate whether it is a multi-insert case, if yes, calculate the jointOperator(FOR[7]). In orderByLimit, return true because there is LIMIT from the path from RS(RS[2]) to jointOperator(FOR[7]). {noformat} > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > -------------------------------------------------------------------------------------------------------- > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task > Reporter: liyunzhang_intel > Assignee: liyunzhang_intel > Attachments: HIVE-16600.1.patch, HIVE-16600.2.patch, > HIVE-16600.3.patch, HIVE-16600.4.patch, HIVE-16600.5.patch, > HIVE-16600.6.patch, HIVE-16600.7.patch, HIVE-16600.8.patch, > HIVE-16600.9.patch, mr.explain, mr.explain.log.HIVE-16600 > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} > TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)