Repository: hive Updated Branches: refs/heads/master a3b7a2452 -> b7723e498
HIVE-20503: Use datastructure aware estimations during mapjoin selection (Zoltan Haindrich reviewed by Ashutosh Chauhan) Signed-off-by: Zoltan Haindrich <k...@rxd.hu> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b7723e49 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b7723e49 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b7723e49 Branch: refs/heads/master Commit: b7723e4987ee5efc8990c92866e76055754b7bf1 Parents: a3b7a24 Author: Zoltan Haindrich <k...@rxd.hu> Authored: Thu Sep 13 09:21:42 2018 +0200 Committer: Zoltan Haindrich <k...@rxd.hu> Committed: Thu Sep 13 09:21:42 2018 +0200 ---------------------------------------------------------------------- .../hive/ql/optimizer/ConvertJoinMapJoin.java | 63 ++++++++++++++++++-- .../tez/TestVectorMapJoinFastHashTable.java | 4 +- 2 files changed, 61 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b7723e49/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index cd952a2..c733cb1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -68,6 +68,9 @@ import org.apache.hadoop.hive.ql.plan.OpTraits; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.hive.ql.stats.StatsUtils; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +91,8 @@ public class ConvertJoinMapJoin implements NodeProcessor { private static final Logger LOG = LoggerFactory.getLogger(ConvertJoinMapJoin.class.getName()); public float hashTableLoadFactor; private long maxJoinMemory; + private HashMapDataStructureType hashMapDataStructure; + private boolean fastHashTableAvailable; @Override /* @@ -102,6 +107,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx; hashTableLoadFactor = context.conf.getFloatVar(ConfVars.HIVEHASHTABLELOADFACTOR); + fastHashTableAvailable = context.conf.getBoolVar(ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED); JoinOperator joinOp = (JoinOperator) nd; // adjust noconditional task size threshold for LLAP @@ -116,6 +122,9 @@ public class ConvertJoinMapJoin implements NodeProcessor { LOG.info("maxJoinMemory: {}", maxJoinMemory); + hashMapDataStructure = HashMapDataStructureType.of(joinOp.getConf()); + + TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); boolean hiveConvertJoin = context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) & !context.parseContext.getDisableMapJoin(); @@ -193,6 +202,32 @@ public class ConvertJoinMapJoin implements NodeProcessor { return null; } + private enum HashMapDataStructureType { + COMPOSITE_KEYED, LONG_KEYED; + + public static HashMapDataStructureType of(JoinDesc conf) { + ExprNodeDesc[][] keys = conf.getJoinKeys(); + if (keys != null && keys[0].length == 1) { + TypeInfo typeInfo = keys[0][0].getTypeInfo(); + if (typeInfo instanceof PrimitiveTypeInfo) { + PrimitiveTypeInfo pti = ((PrimitiveTypeInfo) typeInfo); + PrimitiveCategory pCat = pti.getPrimitiveCategory(); + switch (pCat) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + return HashMapDataStructureType.LONG_KEYED; + default: + break; + } + } + } + return HashMapDataStructureType.COMPOSITE_KEYED; + } + } + private boolean selectJoinForLlap(OptimizeTezProcContext context, JoinOperator joinOp, TezBucketJoinProcCtx tezBucketJoinProcCtx, LlapClusterStateForCompile llapInfo, @@ -239,6 +274,11 @@ public class ConvertJoinMapJoin implements NodeProcessor { LOG.info("Cost of Bucket Map Join : numNodes = " + numNodes + " total small table size = " + totalSize + " networkCostMJ = " + networkCostMJ); + if (totalSize <= maxJoinMemory) { + // mapjoin is applicable; don't try the below algos.. + return false; + } + if (networkCostDPHJ < networkCostMJ) { LOG.info("Dynamically partitioned Hash Join chosen"); return convertJoinDynamicPartitionedHashJoin(joinOp, context); @@ -252,17 +292,32 @@ public class ConvertJoinMapJoin implements NodeProcessor { } public long computeOnlineDataSize(Statistics statistics) { - return computeOnlineDataSizeFast3(statistics); + if (fastHashTableAvailable) { + return computeOnlineDataSizeFast(statistics); + } else { + return computeOnlineDataSizeOptimized(statistics); + } + } + + public long computeOnlineDataSizeFast(Statistics statistics) { + switch (hashMapDataStructure) { + case LONG_KEYED: + return computeOnlineDataSizeFastLongKeyed(statistics); + case COMPOSITE_KEYED: + return computeOnlineDataSizeFastCompositeKeyed(statistics); + default: + throw new RuntimeException("invalid mode"); + } } - public long computeOnlineDataSizeFast2(Statistics statistics) { + public long computeOnlineDataSizeFastLongKeyed(Statistics statistics) { return computeOnlineDataSizeGeneric(statistics, -8, // the long key is stored in a slot 2 * 8 // maintenance structure consists of 2 longs ); } - public long computeOnlineDataSizeFast3(Statistics statistics) { + public long computeOnlineDataSizeFastCompositeKeyed(Statistics statistics) { return computeOnlineDataSizeGeneric(statistics, 5 + 4, // list header ; value length stored as vint 8 // maintenance structure consists of 1 long @@ -1024,7 +1079,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { // We store the total memory that this MapJoin is going to use, // which is calculated as totalSize/buckets, with totalSize // equal to sum of small tables size. - joinOp.getConf().setInMemoryDataSize(totalSize/buckets); + joinOp.getConf().setInMemoryDataSize(totalSize / buckets); return bigTablePosition; } http://git-wip-us.apache.org/repos/asf/hive/blob/b7723e49/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestVectorMapJoinFastHashTable.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestVectorMapJoinFastHashTable.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestVectorMapJoinFastHashTable.java index a01b34c..af60a47 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestVectorMapJoinFastHashTable.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestVectorMapJoinFastHashTable.java @@ -101,10 +101,10 @@ public class TestVectorMapJoinFastHashTable { cjm.hashTableLoadFactor = .75f; switch (l) { case MULTI_KEY: - compilerEstimate = cjm.computeOnlineDataSizeFast3(stat); + compilerEstimate = cjm.computeOnlineDataSizeFastCompositeKeyed(stat); break; case LONG: - compilerEstimate = cjm.computeOnlineDataSizeFast2(stat); + compilerEstimate = cjm.computeOnlineDataSizeFastLongKeyed(stat); break; } LOG.info("stats: {}", stat);