Repository: hive Updated Branches: refs/heads/master f3d144854 -> 61a027af3
HIVE-20623: Shared work: Extend sharing of map-join cache entries in LLAP (Jesus Camacho Rodriguez, reviewed by Gopal V) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/61a027af Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/61a027af Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/61a027af Branch: refs/heads/master Commit: 61a027af3a652f3a324727ec5d603a3cbc32ce7d Parents: f3d1448 Author: Jesus Camacho Rodriguez <jcama...@apache.org> Authored: Sat Sep 22 19:42:24 2018 -0700 Committer: Jesus Camacho Rodriguez <jcama...@apache.org> Committed: Sun Oct 7 17:37:17 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 ++ .../hadoop/hive/ql/exec/MapJoinOperator.java | 11 +++- .../hive/ql/optimizer/SharedWorkOptimizer.java | 62 ++++++++++++++++++++ .../apache/hadoop/hive/ql/plan/MapJoinDesc.java | 16 +++++ 4 files changed, 92 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/61a027af/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 27b1b73..d0adc35 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2285,6 +2285,10 @@ public class HiveConf extends Configuration { "Whether to enable shared work extended optimizer. The optimizer tries to merge equal operators\n" + "after a work boundary after shared work optimizer has been executed. Requires hive.optimize.shared.work\n" + "to be set to true. Tez only."), + HIVE_SHARED_WORK_REUSE_MAPJOIN_CACHE("hive.optimize.shared.work.mapjoin.cache.reuse", true, + "When shared work optimizer is enabled, whether we should reuse the cache for the broadcast side\n" + + "of mapjoin operators that share same broadcast input. Requires hive.optimize.shared.work\n" + + "to be set to true. Tez only."), HIVE_COMBINE_EQUIVALENT_WORK_OPTIMIZATION("hive.combine.equivalent.work.optimization", true, "Whether to " + "combine equivalent work objects during physical optimization.\n This optimization looks for equivalent " + "work objects and combines them if they meet certain preconditions. Spark only."), http://git-wip-us.apache.org/repos/asf/hive/blob/61a027af/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 114cea9..da1dd42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -183,7 +183,16 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem // On Tez only: The hash map might already be cached in the container we run // the task in. On MR: The cache is a no-op. String queryId = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID); - cacheKey = "HASH_MAP_" + this.getOperatorId() + "_container"; + // The cacheKey may have already been defined in the MapJoin conf spec + // as part of the Shared Work Optimization if it can be reused among + // multiple mapjoin operators. In that case, we take that key from conf + // and append this.getClass().getName() to disambiguate between different + // classes that may be using the same source data, e.g. + // VectorMapJoinInnerGenerateResultOperator and VectorMapJoinLeftSemiLongOperator. + // If the cacheKey is not defined in the conf, then we generate it. + cacheKey = conf.getCacheKey() == null ? + MapJoinDesc.generateCacheKey(this.getOperatorId()) : + conf.getCacheKey() + "_" + this.getClass().getName(); cache = ObjectCacheFactory.getCache(hconf, queryId, false); loader = getHashTableLoader(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/61a027af/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java index 2573754..1e3887b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; @@ -508,6 +509,58 @@ public class SharedWorkOptimizer extends Transform { } } + if(pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_REUSE_MAPJOIN_CACHE)) { + // Try to reuse cache for broadcast side in mapjoin operators that + // share same input. + // First we group together all the mapjoin operators that share same + // reduce sink operator. + final Multimap<Operator<?>, MapJoinOperator> parentToMapJoinOperators = + ArrayListMultimap.create(); + final Set<Operator<?>> visitedOperators = new HashSet<>(); + for (Entry<Operator<?>, Collection<Operator<?>>> e : + optimizerCache.operatorToWorkOperators.asMap().entrySet()) { + if (visitedOperators.contains(e.getKey())) { + // Already visited this work, we move on + continue; + } + for (Operator<?> op : e.getValue()) { + if (op instanceof MapJoinOperator) { + MapJoinOperator mapJoinOp = (MapJoinOperator) op; + // Only allowed for mapjoin operator + if (!mapJoinOp.getConf().isBucketMapJoin() && + !mapJoinOp.getConf().isDynamicPartitionHashJoin()) { + parentToMapJoinOperators.put( + obtainBroadcastInput(mapJoinOp).getParentOperators().get(0), mapJoinOp); + } + } + visitedOperators.add(op); + } + } + // For each group, set the cache key accordingly if there is more than one operator + // and input RS operator are equal + for (Collection<MapJoinOperator> c : parentToMapJoinOperators.asMap().values()) { + Map<ReduceSinkOperator, String> rsOpToCacheKey = new HashMap<>(); + for (MapJoinOperator mapJoinOp : c) { + ReduceSinkOperator rsOp = obtainBroadcastInput(mapJoinOp); + String cacheKey = null; + for (Entry<ReduceSinkOperator, String> e: rsOpToCacheKey.entrySet()) { + if (compareOperator(pctx, rsOp, e.getKey())) { + cacheKey = e.getValue(); + break; + } + } + if (cacheKey == null) { + // Either it is the first map join operator or there was no equivalent RS, + // hence generate cache key + cacheKey = MapJoinDesc.generateCacheKey(mapJoinOp.getOperatorId()); + rsOpToCacheKey.put(rsOp, cacheKey); + } + // Set in the conf of the map join operator + mapJoinOp.getConf().setCacheKey(cacheKey); + } + } + } + // If we are running tests, we are going to verify that the contents of the cache // correspond with the contents of the plan, and otherwise we fail. // This check always run when we are running in test mode, independently on whether @@ -535,6 +588,15 @@ public class SharedWorkOptimizer extends Transform { } /** + * Obtain the RS input for a mapjoin operator. + */ + private static ReduceSinkOperator obtainBroadcastInput(MapJoinOperator mapJoinOp) { + return mapJoinOp.getParentOperators().get(0) instanceof ReduceSinkOperator ? + (ReduceSinkOperator) mapJoinOp.getParentOperators().get(0) : + (ReduceSinkOperator) mapJoinOp.getParentOperators().get(1); + } + + /** * This method gathers the TS operators with DPP from the context and * stores them into the input optimization cache. */ http://git-wip-us.apache.org/repos/asf/hive/blob/61a027af/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index 8ba5101..507114b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -88,6 +88,8 @@ public class MapJoinDesc extends JoinDesc implements Serializable { private boolean isHybridHashJoin; private boolean isDynamicPartitionHashJoin = false; + private String cacheKey; + public MapJoinDesc() { bigTableBucketNumMapping = new LinkedHashMap<String, Integer>(); } @@ -111,6 +113,7 @@ public class MapJoinDesc extends JoinDesc implements Serializable { this.parentDataSizes = clone.parentDataSizes; this.isBucketMapJoin = clone.isBucketMapJoin; this.isHybridHashJoin = clone.isHybridHashJoin; + this.cacheKey = clone.cacheKey; } public MapJoinDesc(final Map<Byte, List<ExprNodeDesc>> keys, @@ -128,6 +131,7 @@ public class MapJoinDesc extends JoinDesc implements Serializable { this.bigTableBucketNumMapping = new LinkedHashMap<String, Integer>(); this.dumpFilePrefix = dumpFilePrefix; this.inMemoryDataSize = inMemoryDataSize; + this.cacheKey = null; initRetainExprList(); } @@ -478,6 +482,18 @@ public class MapJoinDesc extends JoinDesc implements Serializable { return Arrays.deepToString(fm); } + public String getCacheKey() { + return cacheKey; + } + + public void setCacheKey(String cacheKey) { + this.cacheKey = cacheKey; + } + + public static String generateCacheKey(String operatorId) { + return "HASH_MAP_" + operatorId + "_container"; + } + // Use LinkedHashSet to give predictable display order. private static final Set<String> vectorizableMapJoinNativeEngines = new LinkedHashSet<String>(Arrays.asList("tez", "spark"));