Repository: hive Updated Branches: refs/heads/spark 74ac99fa3 -> a318d5b00
HIVE-10458: Enable parallel order by for spark [Spark Branch] (Rui reviewed by Xuefu) Signed-off-by: Rui Li <rui...@intel.com> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a318d5b0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a318d5b0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a318d5b0 Branch: refs/heads/spark Commit: a318d5b00f92dd2130de6a45cc44f268fded2d9c Parents: 74ac99f Author: Rui Li <rui...@intel.com> Authored: Thu May 21 10:13:17 2015 +0800 Committer: Rui Li <rui...@intel.com> Committed: Thu May 21 10:13:17 2015 +0800 ---------------------------------------------------------------------- .../correlation/ReduceSinkDeDuplication.java | 12 ++++----- .../spark/SetSparkReducerParallelism.java | 28 +++++++++++++++++++- .../hive/ql/parse/spark/GenSparkUtils.java | 2 +- .../hadoop/hive/ql/plan/ReduceSinkDesc.java | 15 +++++------ .../clientpositive/spark/parallel_orderby.q.out | 6 ++--- 5 files changed, 44 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a318d5b0/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java index 404b759..8ac9ca7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java @@ -489,7 +489,7 @@ public class ReduceSinkDeDuplication implements Transform { if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { CorrelationUtilities.replaceReduceSinkWithSelectOperator( cRS, dedupCtx.getPctx(), dedupCtx); - pRS.getConf().setEnforceSort(true); + pRS.getConf().setDeduplicated(true); return true; } return false; @@ -512,7 +512,7 @@ public class ReduceSinkDeDuplication implements Transform { if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { CorrelationUtilities.removeReduceSinkForGroupBy( cRS, cGBY, dedupCtx.getPctx(), dedupCtx); - pRS.getConf().setEnforceSort(true); + pRS.getConf().setDeduplicated(true); return true; } return false; @@ -535,7 +535,7 @@ public class ReduceSinkDeDuplication implements Transform { CorrelationUtilities.findPossibleParent( pJoin, ReduceSinkOperator.class, dedupCtx.trustScript()); if (pRS != null) { - pRS.getConf().setEnforceSort(true); + pRS.getConf().setDeduplicated(true); } return true; } @@ -559,7 +559,7 @@ public class ReduceSinkDeDuplication implements Transform { CorrelationUtilities.findPossibleParent( pJoin, ReduceSinkOperator.class, dedupCtx.trustScript()); if (pRS != null) { - pRS.getConf().setEnforceSort(true); + pRS.getConf().setDeduplicated(true); } return true; } @@ -579,7 +579,7 @@ public class ReduceSinkDeDuplication implements Transform { if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { CorrelationUtilities.replaceReduceSinkWithSelectOperator( cRS, dedupCtx.getPctx(), dedupCtx); - pRS.getConf().setEnforceSort(true); + pRS.getConf().setDeduplicated(true); return true; } return false; @@ -596,7 +596,7 @@ public class ReduceSinkDeDuplication implements Transform { start, ReduceSinkOperator.class, dedupCtx.trustScript()); if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { CorrelationUtilities.removeReduceSinkForGroupBy(cRS, cGBY, dedupCtx.getPctx(), dedupCtx); - pRS.getConf().setEnforceSort(true); + pRS.getConf().setDeduplicated(true); return true; } return false; http://git-wip-us.apache.org/repos/asf/hive/blob/a318d5b0/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index f9ef474..5f9225c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer.spark; +import java.util.List; import java.util.Stack; import org.apache.commons.logging.Log; @@ -26,6 +27,7 @@ import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -75,7 +77,7 @@ public class SetSparkReducerParallelism implements NodeProcessor { context.getVisitedReduceSinks().add(sink); - if (desc.getNumReducers() <= 0) { + if (needSetParallelism(sink, context.getConf())) { if (constantReducers > 0) { LOG.info("Parallelism for reduce sink " + sink + " set by user to " + constantReducers); desc.setNumReducers(constantReducers); @@ -158,4 +160,28 @@ public class SetSparkReducerParallelism implements NodeProcessor { return false; } + // tests whether the RS needs automatic setting parallelism + private boolean needSetParallelism(ReduceSinkOperator reduceSink, HiveConf hiveConf) { + ReduceSinkDesc desc = reduceSink.getConf(); + if (desc.getNumReducers() <= 0) { + return true; + } + if (desc.getNumReducers() == 1 && desc.hasOrderBy() && + hiveConf.getBoolVar(HiveConf.ConfVars.HIVESAMPLINGFORORDERBY) && !desc.isDeduplicated()) { + List<Operator<? extends OperatorDesc>> children = reduceSink.getChildOperators(); + while (children != null && children.size() > 0) { + if (children.size() != 1 || children.get(0) instanceof LimitOperator) { + return false; + } + if (children.get(0) instanceof ReduceSinkOperator || + children.get(0) instanceof FileSinkOperator) { + break; + } + children = children.get(0).getChildOperators(); + } + return true; + } + return false; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/a318d5b0/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index e27ce0d..7992c88 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -399,7 +399,7 @@ public class GenSparkUtils { */ private static boolean groupByNeedParLevelOrder(ReduceSinkOperator reduceSinkOperator) { // whether we have to enforce sort anyway, e.g. in case of RS deduplication - if (reduceSinkOperator.getConf().isEnforceSort()) { + if (reduceSinkOperator.getConf().isDeduplicated()) { return true; } List<Operator<? extends OperatorDesc>> children = reduceSinkOperator.getChildOperators(); http://git-wip-us.apache.org/repos/asf/hive/blob/a318d5b0/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 1891dff..b4316ec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.plan; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; -import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -114,8 +113,8 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { // Write type, since this needs to calculate buckets differently for updates and deletes private AcidUtils.Operation writeType; - // whether we'll enforce the sort order of the RS - private transient boolean enforceSort = false; + // whether this RS is deduplicated + private transient boolean isDeduplicated = false; // used by spark mode to decide whether global order is needed private transient boolean hasOrderBy = false; @@ -174,7 +173,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { desc.setStatistics(this.getStatistics()); desc.setSkipTag(skipTag); desc.reduceTraits = reduceTraits.clone(); - desc.setEnforceSort(enforceSort); + desc.setDeduplicated(isDeduplicated); desc.setHasOrderBy(hasOrderBy); return desc; } @@ -434,12 +433,12 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { return writeType; } - public boolean isEnforceSort() { - return enforceSort; + public boolean isDeduplicated() { + return isDeduplicated; } - public void setEnforceSort(boolean isDeduplicated) { - this.enforceSort = isDeduplicated; + public void setDeduplicated(boolean isDeduplicated) { + this.isDeduplicated = isDeduplicated; } public boolean hasOrderBy() { http://git-wip-us.apache.org/repos/asf/hive/blob/a318d5b0/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out b/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out index 0194dbb..03314ea 100644 --- a/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out +++ b/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out @@ -38,7 +38,7 @@ STAGE PLANS: Stage: Stage-1 Spark Edges: - Reducer 2 <- Map 1 (SORT, 1) + Reducer 2 <- Map 1 (SORT, 4) #### A masked pattern was here #### Vertices: Map 1 @@ -117,7 +117,7 @@ Retention: 0 Table Type: MANAGED_TABLE Table Parameters: COLUMN_STATS_ACCURATE true - numFiles 1 + numFiles 4 numRows 48 rawDataSize 512 totalSize 560 @@ -231,7 +231,7 @@ Retention: 0 Table Type: MANAGED_TABLE Table Parameters: COLUMN_STATS_ACCURATE true - numFiles 1 + numFiles 4 numRows 48 rawDataSize 512 totalSize 560