Repository: hive
Updated Branches:
  refs/heads/master f3d144854 -> 61a027af3


HIVE-20623: Shared work: Extend sharing of map-join cache entries in LLAP 
(Jesus Camacho Rodriguez, reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/61a027af
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/61a027af
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/61a027af

Branch: refs/heads/master
Commit: 61a027af3a652f3a324727ec5d603a3cbc32ce7d
Parents: f3d1448
Author: Jesus Camacho Rodriguez <jcama...@apache.org>
Authored: Sat Sep 22 19:42:24 2018 -0700
Committer: Jesus Camacho Rodriguez <jcama...@apache.org>
Committed: Sun Oct 7 17:37:17 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  4 ++
 .../hadoop/hive/ql/exec/MapJoinOperator.java    | 11 +++-
 .../hive/ql/optimizer/SharedWorkOptimizer.java  | 62 ++++++++++++++++++++
 .../apache/hadoop/hive/ql/plan/MapJoinDesc.java | 16 +++++
 4 files changed, 92 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/61a027af/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 27b1b73..d0adc35 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2285,6 +2285,10 @@ public class HiveConf extends Configuration {
         "Whether to enable shared work extended optimizer. The optimizer tries 
to merge equal operators\n" +
         "after a work boundary after shared work optimizer has been executed. 
Requires hive.optimize.shared.work\n" +
         "to be set to true. Tez only."),
+    
HIVE_SHARED_WORK_REUSE_MAPJOIN_CACHE("hive.optimize.shared.work.mapjoin.cache.reuse",
 true,
+        "When shared work optimizer is enabled, whether we should reuse the 
cache for the broadcast side\n" +
+        "of mapjoin operators that share same broadcast input. Requires 
hive.optimize.shared.work\n" +
+        "to be set to true. Tez only."),
     
HIVE_COMBINE_EQUIVALENT_WORK_OPTIMIZATION("hive.combine.equivalent.work.optimization",
 true, "Whether to " +
             "combine equivalent work objects during physical optimization.\n 
This optimization looks for equivalent " +
             "work objects and combines them if they meet certain 
preconditions. Spark only."),

http://git-wip-us.apache.org/repos/asf/hive/blob/61a027af/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index 114cea9..da1dd42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -183,7 +183,16 @@ public class MapJoinOperator extends 
AbstractMapJoinOperator<MapJoinDesc> implem
     // On Tez only: The hash map might already be cached in the container we 
run
     // the task in. On MR: The cache is a no-op.
     String queryId = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID);
-    cacheKey = "HASH_MAP_" + this.getOperatorId() + "_container";
+    // The cacheKey may have already been defined in the MapJoin conf spec
+    // as part of the Shared Work Optimization if it can be reused among
+    // multiple mapjoin operators. In that case, we take that key from conf
+    // and append this.getClass().getName() to disambiguate between different
+    // classes that may be using the same source data, e.g.
+    // VectorMapJoinInnerGenerateResultOperator and 
VectorMapJoinLeftSemiLongOperator.
+    // If the cacheKey is not defined in the conf, then we generate it.
+    cacheKey = conf.getCacheKey() == null ?
+        MapJoinDesc.generateCacheKey(this.getOperatorId()) :
+        conf.getCacheKey() + "_" + this.getClass().getName();
     cache = ObjectCacheFactory.getCache(hconf, queryId, false);
     loader = getHashTableLoader(hconf);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/61a027af/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
index 2573754..1e3887b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -508,6 +509,58 @@ public class SharedWorkOptimizer extends Transform {
       }
     }
 
+    
if(pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_REUSE_MAPJOIN_CACHE)) {
+      // Try to reuse cache for broadcast side in mapjoin operators that
+      // share same input.
+      // First we group together all the mapjoin operators that share same
+      // reduce sink operator.
+      final Multimap<Operator<?>, MapJoinOperator> parentToMapJoinOperators =
+          ArrayListMultimap.create();
+      final Set<Operator<?>> visitedOperators = new HashSet<>();
+      for (Entry<Operator<?>, Collection<Operator<?>>> e :
+          optimizerCache.operatorToWorkOperators.asMap().entrySet()) {
+        if (visitedOperators.contains(e.getKey())) {
+          // Already visited this work, we move on
+          continue;
+        }
+        for (Operator<?> op : e.getValue()) {
+          if (op instanceof MapJoinOperator) {
+            MapJoinOperator mapJoinOp = (MapJoinOperator) op;
+            // Only allowed for mapjoin operator
+            if (!mapJoinOp.getConf().isBucketMapJoin() &&
+                !mapJoinOp.getConf().isDynamicPartitionHashJoin()) {
+              parentToMapJoinOperators.put(
+                  obtainBroadcastInput(mapJoinOp).getParentOperators().get(0), 
mapJoinOp);
+            }
+          }
+          visitedOperators.add(op);
+        }
+      }
+      // For each group, set the cache key accordingly if there is more than 
one operator
+      // and input RS operator are equal
+      for (Collection<MapJoinOperator> c : 
parentToMapJoinOperators.asMap().values()) {
+        Map<ReduceSinkOperator, String> rsOpToCacheKey = new HashMap<>();
+        for (MapJoinOperator mapJoinOp : c) {
+          ReduceSinkOperator rsOp = obtainBroadcastInput(mapJoinOp);
+          String cacheKey = null;
+          for (Entry<ReduceSinkOperator, String> e: rsOpToCacheKey.entrySet()) 
{
+            if (compareOperator(pctx, rsOp, e.getKey())) {
+              cacheKey = e.getValue();
+              break;
+            }
+          }
+          if (cacheKey == null) {
+            // Either it is the first map join operator or there was no 
equivalent RS,
+            // hence generate cache key
+            cacheKey = MapJoinDesc.generateCacheKey(mapJoinOp.getOperatorId());
+            rsOpToCacheKey.put(rsOp, cacheKey);
+          }
+          // Set in the conf of the map join operator
+          mapJoinOp.getConf().setCacheKey(cacheKey);
+        }
+      }
+    }
+
     // If we are running tests, we are going to verify that the contents of 
the cache
     // correspond with the contents of the plan, and otherwise we fail.
     // This check always run when we are running in test mode, independently 
on whether
@@ -535,6 +588,15 @@ public class SharedWorkOptimizer extends Transform {
   }
 
   /**
+   * Obtain the RS input for a mapjoin operator.
+   */
+  private static ReduceSinkOperator obtainBroadcastInput(MapJoinOperator 
mapJoinOp) {
+    return mapJoinOp.getParentOperators().get(0) instanceof ReduceSinkOperator 
?
+        (ReduceSinkOperator) mapJoinOp.getParentOperators().get(0) :
+        (ReduceSinkOperator) mapJoinOp.getParentOperators().get(1);
+  }
+
+  /**
    * This method gathers the TS operators with DPP from the context and
    * stores them into the input optimization cache.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/61a027af/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
index 8ba5101..507114b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
@@ -88,6 +88,8 @@ public class MapJoinDesc extends JoinDesc implements 
Serializable {
   private boolean isHybridHashJoin;
   private boolean isDynamicPartitionHashJoin = false;
 
+  private String cacheKey;
+
   public MapJoinDesc() {
     bigTableBucketNumMapping = new LinkedHashMap<String, Integer>();
   }
@@ -111,6 +113,7 @@ public class MapJoinDesc extends JoinDesc implements 
Serializable {
     this.parentDataSizes = clone.parentDataSizes;
     this.isBucketMapJoin = clone.isBucketMapJoin;
     this.isHybridHashJoin = clone.isHybridHashJoin;
+    this.cacheKey = clone.cacheKey;
   }
 
   public MapJoinDesc(final Map<Byte, List<ExprNodeDesc>> keys,
@@ -128,6 +131,7 @@ public class MapJoinDesc extends JoinDesc implements 
Serializable {
     this.bigTableBucketNumMapping = new LinkedHashMap<String, Integer>();
     this.dumpFilePrefix = dumpFilePrefix;
     this.inMemoryDataSize = inMemoryDataSize;
+    this.cacheKey = null;
     initRetainExprList();
   }
 
@@ -478,6 +482,18 @@ public class MapJoinDesc extends JoinDesc implements 
Serializable {
     return Arrays.deepToString(fm);
   }
 
+  public String getCacheKey() {
+    return cacheKey;
+  }
+
+  public void setCacheKey(String cacheKey) {
+    this.cacheKey = cacheKey;
+  }
+
+  public static String generateCacheKey(String operatorId) {
+    return "HASH_MAP_" + operatorId + "_container";
+  }
+
   // Use LinkedHashSet to give predictable display order.
   private static final Set<String> vectorizableMapJoinNativeEngines =
       new LinkedHashSet<String>(Arrays.asList("tez", "spark"));

Reply via email to