Repository: hive Updated Branches: refs/heads/hive-14535 ccea0d6ff -> 187eb760d
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java index a694cf8..c81131e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java @@ -26,7 +26,6 @@ import static org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode. import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Deque; import java.util.EnumSet; @@ -51,7 +50,6 @@ import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezTask; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; @@ -106,12 +104,20 @@ public class LlapDecider implements PhysicalPlanResolver { } private LlapMode mode; + private final LlapClusterStateForCompile clusterState; + + public LlapDecider(LlapClusterStateForCompile clusterState) { + this.clusterState = clusterState; + } + class LlapDecisionDispatcher implements Dispatcher { private final HiveConf conf; private final boolean doSkipUdfCheck; private final boolean arePermanentFnsAllowed; private final boolean shouldUber; + private final float minReducersPerExec; + private final int executorsPerNode; private List<MapJoinOperator> mapJoinOpList; private final Map<Rule, NodeProcessor> rules; @@ -121,6 +127,9 @@ public class LlapDecider implements PhysicalPlanResolver { arePermanentFnsAllowed = HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOW_PERMANENT_FNS); // Don't user uber in "all" mode - everything can go into LLAP, which is better than uber. shouldUber = HiveConf.getBoolVar(conf, ConfVars.LLAP_AUTO_ALLOW_UBER) && (mode != all); + minReducersPerExec = HiveConf.getFloatVar( + conf, ConfVars.TEZ_LLAP_MIN_REDUCER_PER_EXECUTOR); + executorsPerNode = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); // TODO# hmm mapJoinOpList = new ArrayList<MapJoinOperator>(); rules = getRules(); } @@ -139,22 +148,57 @@ public class LlapDecider implements PhysicalPlanResolver { return null; } - private void handleWork(TezWork tezWork, BaseWork work) - throws SemanticException { + private void handleWork(TezWork tezWork, BaseWork work) throws SemanticException { boolean workCanBeDoneInLlap = evaluateWork(tezWork, work); LOG.debug( "Work " + work + " " + (workCanBeDoneInLlap ? "can" : "cannot") + " be done in LLAP"); if (workCanBeDoneInLlap) { for (MapJoinOperator graceMapJoinOp : mapJoinOpList) { - LOG.debug( - "Disabling hybrid grace hash join in case of LLAP and non-dynamic partition hash join."); + LOG.debug("Disabling hybrid grace hash join in case of LLAP " + + "and non-dynamic partition hash join."); graceMapJoinOp.getConf().setHybridHashJoin(false); } + adjustAutoParallelism(work); + convertWork(tezWork, work); } mapJoinOpList.clear(); } + private void adjustAutoParallelism(BaseWork work) { + if (minReducersPerExec <= 0 || !(work instanceof ReduceWork)) return; + ReduceWork reduceWork = (ReduceWork)work; + if (reduceWork.isAutoReduceParallelism() == false && reduceWork.isUniformDistribution() == false) { + return; // Not based on ARP and cannot assume uniform distribution, bail. + } + clusterState.initClusterInfo(); + int targetCount = 0; + if (!clusterState.hasClusterInfo()) { + LOG.warn("Cannot determine LLAP cluster information"); + targetCount = (int)Math.ceil(minReducersPerExec * 1 * executorsPerNode); + } else { + targetCount = (int)Math.ceil(minReducersPerExec * (clusterState.getKnownExecutorCount() + + clusterState.getNodeCountWithUnknownExecutors() * executorsPerNode)); + } + // We only increase the targets here. + if (reduceWork.isAutoReduceParallelism()) { + int newMin = Math.max(reduceWork.getMinReduceTasks(), targetCount); + if (newMin < reduceWork.getMaxReduceTasks()) { + reduceWork.setMinReduceTasks(newMin); + reduceWork.getEdgePropRef().setAutoReduce(conf, true, newMin, + reduceWork.getMaxReduceTasks(), conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER)); + } else { + reduceWork.setAutoReduceParallelism(false); + reduceWork.setNumReduceTasks(newMin); + // TODO: is this correct? based on the same logic as HIVE-14200 + reduceWork.getEdgePropRef().setAutoReduce(null, false, 0, 0, 0); + } + } else { + // UNIFORM || AUTOPARALLEL (maxed out) + reduceWork.setNumReduceTasks(Math.max(reduceWork.getNumReduceTasks(), targetCount)); + } + } + private void convertWork(TezWork tezWork, BaseWork work) throws SemanticException {