Repository: hive
Updated Branches:
  refs/heads/master a3b7a2452 -> b7723e498


HIVE-20503: Use datastructure aware estimations during mapjoin selection 
(Zoltan Haindrich reviewed by Ashutosh Chauhan)

Signed-off-by: Zoltan Haindrich <k...@rxd.hu>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b7723e49
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b7723e49
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b7723e49

Branch: refs/heads/master
Commit: b7723e4987ee5efc8990c92866e76055754b7bf1
Parents: a3b7a24
Author: Zoltan Haindrich <k...@rxd.hu>
Authored: Thu Sep 13 09:21:42 2018 +0200
Committer: Zoltan Haindrich <k...@rxd.hu>
Committed: Thu Sep 13 09:21:42 2018 +0200

----------------------------------------------------------------------
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   | 63 ++++++++++++++++++--
 .../tez/TestVectorMapJoinFastHashTable.java     |  4 +-
 2 files changed, 61 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b7723e49/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index cd952a2..c733cb1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -68,6 +68,9 @@ import org.apache.hadoop.hive.ql.plan.OpTraits;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.Statistics;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,6 +91,8 @@ public class ConvertJoinMapJoin implements NodeProcessor {
   private static final Logger LOG = 
LoggerFactory.getLogger(ConvertJoinMapJoin.class.getName());
   public float hashTableLoadFactor;
   private long maxJoinMemory;
+  private HashMapDataStructureType hashMapDataStructure;
+  private boolean fastHashTableAvailable;
 
   @Override
   /*
@@ -102,6 +107,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx;
 
     hashTableLoadFactor = 
context.conf.getFloatVar(ConfVars.HIVEHASHTABLELOADFACTOR);
+    fastHashTableAvailable = 
context.conf.getBoolVar(ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED);
 
     JoinOperator joinOp = (JoinOperator) nd;
     // adjust noconditional task size threshold for LLAP
@@ -116,6 +122,9 @@ public class ConvertJoinMapJoin implements NodeProcessor {
 
     LOG.info("maxJoinMemory: {}", maxJoinMemory);
 
+    hashMapDataStructure = HashMapDataStructureType.of(joinOp.getConf());
+
+
     TezBucketJoinProcCtx tezBucketJoinProcCtx = new 
TezBucketJoinProcCtx(context.conf);
     boolean hiveConvertJoin = 
context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) &
             !context.parseContext.getDisableMapJoin();
@@ -193,6 +202,32 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     return null;
   }
 
+  private enum HashMapDataStructureType {
+    COMPOSITE_KEYED, LONG_KEYED;
+
+    public static HashMapDataStructureType of(JoinDesc conf) {
+      ExprNodeDesc[][] keys = conf.getJoinKeys();
+      if (keys != null && keys[0].length == 1) {
+        TypeInfo typeInfo = keys[0][0].getTypeInfo();
+        if (typeInfo instanceof PrimitiveTypeInfo) {
+          PrimitiveTypeInfo pti = ((PrimitiveTypeInfo) typeInfo);
+          PrimitiveCategory pCat = pti.getPrimitiveCategory();
+          switch (pCat) {
+          case BOOLEAN:
+          case BYTE:
+          case SHORT:
+          case INT:
+          case LONG:
+            return HashMapDataStructureType.LONG_KEYED;
+          default:
+            break;
+          }
+        }
+      }
+      return HashMapDataStructureType.COMPOSITE_KEYED;
+    }
+  }
+
   private boolean selectJoinForLlap(OptimizeTezProcContext context, 
JoinOperator joinOp,
                           TezBucketJoinProcCtx tezBucketJoinProcCtx,
                           LlapClusterStateForCompile llapInfo,
@@ -239,6 +274,11 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     LOG.info("Cost of Bucket Map Join : numNodes = " + numNodes + " total 
small table size = "
     + totalSize + " networkCostMJ = " + networkCostMJ);
 
+    if (totalSize <= maxJoinMemory) {
+      // mapjoin is applicable; don't try the below algos..
+      return false;
+    }
+
     if (networkCostDPHJ < networkCostMJ) {
       LOG.info("Dynamically partitioned Hash Join chosen");
       return convertJoinDynamicPartitionedHashJoin(joinOp, context);
@@ -252,17 +292,32 @@ public class ConvertJoinMapJoin implements NodeProcessor {
   }
 
   public long computeOnlineDataSize(Statistics statistics) {
-    return computeOnlineDataSizeFast3(statistics);
+    if (fastHashTableAvailable) {
+      return computeOnlineDataSizeFast(statistics);
+    } else {
+      return computeOnlineDataSizeOptimized(statistics);
+    }
+  }
+
+  public long computeOnlineDataSizeFast(Statistics statistics) {
+    switch (hashMapDataStructure) {
+    case LONG_KEYED:
+      return computeOnlineDataSizeFastLongKeyed(statistics);
+    case COMPOSITE_KEYED:
+      return computeOnlineDataSizeFastCompositeKeyed(statistics);
+    default:
+      throw new RuntimeException("invalid mode");
+    }
   }
 
-  public long computeOnlineDataSizeFast2(Statistics statistics) {
+  public long computeOnlineDataSizeFastLongKeyed(Statistics statistics) {
     return computeOnlineDataSizeGeneric(statistics,
         -8, // the long key is stored in a slot
         2 * 8 // maintenance structure consists of 2 longs
     );
   }
 
-  public long computeOnlineDataSizeFast3(Statistics statistics) {
+  public long computeOnlineDataSizeFastCompositeKeyed(Statistics statistics) {
     return computeOnlineDataSizeGeneric(statistics,
         5 + 4, // list header ; value length stored as vint
         8 // maintenance structure consists of 1 long
@@ -1024,7 +1079,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     // We store the total memory that this MapJoin is going to use,
     // which is calculated as totalSize/buckets, with totalSize
     // equal to sum of small tables size.
-    joinOp.getConf().setInMemoryDataSize(totalSize/buckets);
+    joinOp.getConf().setInMemoryDataSize(totalSize / buckets);
 
     return bigTablePosition;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b7723e49/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestVectorMapJoinFastHashTable.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestVectorMapJoinFastHashTable.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestVectorMapJoinFastHashTable.java
index a01b34c..af60a47 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestVectorMapJoinFastHashTable.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestVectorMapJoinFastHashTable.java
@@ -101,10 +101,10 @@ public class TestVectorMapJoinFastHashTable {
     cjm.hashTableLoadFactor = .75f;
     switch (l) {
     case MULTI_KEY:
-      compilerEstimate = cjm.computeOnlineDataSizeFast3(stat);
+      compilerEstimate = cjm.computeOnlineDataSizeFastCompositeKeyed(stat);
       break;
     case LONG:
-      compilerEstimate = cjm.computeOnlineDataSizeFast2(stat);
+      compilerEstimate = cjm.computeOnlineDataSizeFastLongKeyed(stat);
       break;
     }
     LOG.info("stats: {}", stat);

Reply via email to