HIVE-20439: Use the inflated memory limit during join selection for llap (Zoltan Haindrich reviewed by Ashutosh Chauhan)
Signed-off-by: Zoltan Haindrich <k...@rxd.hu> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fb7a676b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fb7a676b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fb7a676b Branch: refs/heads/master Commit: fb7a676b3f6baa3d156b3e7b3d1961a83bb7d698 Parents: f5e62eb Author: Zoltan Haindrich <k...@rxd.hu> Authored: Mon Aug 27 11:04:07 2018 +0200 Committer: Zoltan Haindrich <k...@rxd.hu> Committed: Mon Aug 27 11:53:49 2018 +0200 ---------------------------------------------------------------------- .../hive/ql/optimizer/ConvertJoinMapJoin.java | 45 +- .../hadoop/hive/ql/exec/TestOperators.java | 17 +- .../clientpositive/bucket_map_join_tez2.q | 4 +- .../bucketsortoptimize_insert_6.q | 2 +- .../queries/clientpositive/join32_lessSize.q | 2 +- .../test/queries/clientpositive/tez_smb_main.q | 4 +- .../queries/clientpositive/unionDistinct_1.q | 2 +- .../results/clientpositive/llap/orc_llap.q.out | 59 +- .../clientpositive/spark/join32_lessSize.q.out | 1159 +++++++++--------- 9 files changed, 678 insertions(+), 616 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 4145baf..52855e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -87,6 +87,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { private static final Logger LOG = LoggerFactory.getLogger(ConvertJoinMapJoin.class.getName()); private float hashTableLoadFactor; + private long maxJoinMemory; @Override /* @@ -103,15 +104,17 @@ public class ConvertJoinMapJoin implements NodeProcessor { hashTableLoadFactor = context.conf.getFloatVar(ConfVars.HIVEHASHTABLELOADFACTOR); JoinOperator joinOp = (JoinOperator) nd; - long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); // adjust noconditional task size threshold for LLAP LlapClusterStateForCompile llapInfo = null; if ("llap".equalsIgnoreCase(context.conf.getVar(ConfVars.HIVE_EXECUTION_MODE))) { llapInfo = LlapClusterStateForCompile.getClusterInfo(context.conf); llapInfo.initClusterInfo(); } - MemoryMonitorInfo memoryMonitorInfo = getMemoryMonitorInfo(maxSize, context.conf, llapInfo); + MemoryMonitorInfo memoryMonitorInfo = getMemoryMonitorInfo(context.conf, llapInfo); joinOp.getConf().setMemoryMonitorInfo(memoryMonitorInfo); + maxJoinMemory = memoryMonitorInfo.getAdjustedNoConditionalTaskSize(); + + LOG.info("maxJoinMemory: {}", maxJoinMemory); TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); boolean hiveConvertJoin = context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) & @@ -119,11 +122,11 @@ public class ConvertJoinMapJoin implements NodeProcessor { if (!hiveConvertJoin) { // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize); + Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx); if (retval == null) { return retval; } else { - fallbackToReduceSideJoin(joinOp, context, maxSize); + fallbackToReduceSideJoin(joinOp, context); return null; } } @@ -138,15 +141,15 @@ public class ConvertJoinMapJoin implements NodeProcessor { numBuckets = 1; } LOG.info("Estimated number of buckets " + numBuckets); - int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxSize, true); + int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxJoinMemory, true); if (mapJoinConversionPos < 0) { - Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize); + Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx); if (retval == null) { return retval; } else { // only case is full outer join with SMB enabled which is not possible. Convert to regular // join. - fallbackToReduceSideJoin(joinOp, context, maxSize); + fallbackToReduceSideJoin(joinOp, context); return null; } } @@ -167,12 +170,12 @@ public class ConvertJoinMapJoin implements NodeProcessor { // check if we can convert to map join no bucket scaling. LOG.info("Convert to non-bucketed map join"); if (numBuckets != 1) { - mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1, false, maxSize, true); + mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1, false, maxJoinMemory, true); } if (mapJoinConversionPos < 0) { // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - fallbackToReduceSideJoin(joinOp, context, maxSize); + fallbackToReduceSideJoin(joinOp, context); return null; } @@ -238,8 +241,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { if (networkCostDPHJ < networkCostMJ) { LOG.info("Dynamically partitioned Hash Join chosen"); - long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); - return convertJoinDynamicPartitionedHashJoin(joinOp, context, maxSize); + return convertJoinDynamicPartitionedHashJoin(joinOp, context); } else if (numBuckets > 1) { LOG.info("Bucket Map Join chosen"); return convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx); @@ -271,9 +273,10 @@ public class ConvertJoinMapJoin implements NodeProcessor { } @VisibleForTesting - public MemoryMonitorInfo getMemoryMonitorInfo(final long maxSize, + public MemoryMonitorInfo getMemoryMonitorInfo( final HiveConf conf, LlapClusterStateForCompile llapInfo) { + long maxSize = conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR); final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY); final long memoryCheckInterval = conf.getLongVar(ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); @@ -315,13 +318,13 @@ public class ConvertJoinMapJoin implements NodeProcessor { @SuppressWarnings("unchecked") private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperator joinOp, - TezBucketJoinProcCtx tezBucketJoinProcCtx, final long maxSize) throws SemanticException { + TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { // we cannot convert to bucket map join, we cannot convert to // map join either based on the size. Check if we can convert to SMB join. if (!(HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN)) || ((!HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN_REDUCE)) && joinOp.getOpTraits().getNumReduceSinks() >= 2)) { - fallbackToReduceSideJoin(joinOp, context, maxSize); + fallbackToReduceSideJoin(joinOp, context); return null; } Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null; @@ -350,7 +353,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { // contains aliases from sub-query // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - fallbackToReduceSideJoin(joinOp, context, maxSize); + fallbackToReduceSideJoin(joinOp, context); return null; } @@ -360,7 +363,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { } else { // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - fallbackToReduceSideJoin(joinOp, context, maxSize); + fallbackToReduceSideJoin(joinOp, context); } return null; } @@ -893,6 +896,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { } long inputSize = computeOnlineDataSize(currInputStat); + LOG.info("Join input#{}; onlineDataSize: {}; Statistics: {}", pos, inputSize, currInputStat); boolean currentInputNotFittingInMemory = false; if ((bigInputStat == null) @@ -1271,14 +1275,13 @@ public class ConvertJoinMapJoin implements NodeProcessor { return numBuckets; } - private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context, - final long maxSize) + private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context) throws SemanticException { // Attempt dynamic partitioned hash join // Since we don't have big table index yet, must start with estimate of numReducers int numReducers = estimateNumBuckets(joinOp, false); LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers"); - int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, maxSize,false); + int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, maxJoinMemory, false); if (bigTablePos >= 0) { // Now that we have the big table index, get real numReducers value based on big table RS ReduceSinkOperator bigTableParentRS = @@ -1314,11 +1317,11 @@ public class ConvertJoinMapJoin implements NodeProcessor { return false; } - private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context, final long maxSize) + private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context) throws SemanticException { if (context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) && context.conf.getBoolVar(HiveConf.ConfVars.HIVEDYNAMICPARTITIONHASHJOIN)) { - if (convertJoinDynamicPartitionedHashJoin(joinOp, context, maxSize)) { + if (convertJoinDynamicPartitionedHashJoin(joinOp, context)) { return; } } http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java index bbc2453..fe64bf5 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java @@ -26,8 +26,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import junit.framework.TestCase; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -66,6 +64,8 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.junit.Assert; import org.junit.Test; +import junit.framework.TestCase; + /** * TestOperators. * @@ -442,6 +442,7 @@ public class TestOperators extends TestCase { ConvertJoinMapJoin convertJoinMapJoin = new ConvertJoinMapJoin(); long defaultNoConditionalTaskSize = 1024L * 1024L * 1024L; HiveConf hiveConf = new HiveConf(); + hiveConf.setLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD, defaultNoConditionalTaskSize); LlapClusterStateForCompile llapInfo = null; if ("llap".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_MODE))) { @@ -449,8 +450,8 @@ public class TestOperators extends TestCase { llapInfo.initClusterInfo(); } // execution mode not set, null is returned - assertEquals(defaultNoConditionalTaskSize, convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, - hiveConf, llapInfo).getAdjustedNoConditionalTaskSize()); + assertEquals(defaultNoConditionalTaskSize, + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).getAdjustedNoConditionalTaskSize()); hiveConf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "llap"); if ("llap".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_MODE))) { @@ -464,7 +465,7 @@ public class TestOperators extends TestCase { int maxSlots = 3; long expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * maxSlots)); assertEquals(expectedSize, - convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf, llapInfo) + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo) .getAdjustedNoConditionalTaskSize()); // num executors is less than max executors per query (which is not expected case), default executors will be @@ -473,18 +474,18 @@ public class TestOperators extends TestCase { hiveConf.set(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname, "5"); expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * chosenSlots)); assertEquals(expectedSize, - convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf, llapInfo) + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo) .getAdjustedNoConditionalTaskSize()); // disable memory checking hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL.varname, "0"); assertFalse( - convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf, llapInfo).doMemoryMonitoring()); + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).doMemoryMonitoring()); // invalid inflation factor hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL.varname, "10000"); hiveConf.set(HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR.varname, "0.0f"); assertFalse( - convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf, llapInfo).doMemoryMonitoring()); + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).doMemoryMonitoring()); } } http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q index ae1ec44..85f2f2b 100644 --- a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q +++ b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q @@ -47,7 +47,7 @@ select key,value from srcbucket_mapjoin_n18; analyze table tab1_n5 compute statistics for columns; -- A negative test as src is not bucketed. -set hive.auto.convert.join.noconditionaltask.size=20000; +set hive.auto.convert.join.noconditionaltask.size=12000; set hive.convert.join.bucket.mapjoin.tez = false; explain select a.key, a.value, b.value @@ -98,7 +98,7 @@ insert overwrite table tab_part1 partition (ds='2008-04-08') select key,value from srcbucket_mapjoin_part_n20; analyze table tab_part1 compute statistics for columns; -set hive.auto.convert.join.noconditionaltask.size=20000; +set hive.auto.convert.join.noconditionaltask.size=12000; set hive.convert.join.bucket.mapjoin.tez = false; explain select count(*) http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q b/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q index cd0a234..2d4907c 100644 --- a/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q +++ b/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q @@ -30,7 +30,7 @@ INSERT OVERWRITE TABLE test_table2_n3 PARTITION (ds = '1') SELECT key, key+1, va -- Insert data into the bucketed table by selecting from another bucketed table -- This should be a map-only operation, since the sort-order matches -set hive.auto.convert.join.noconditionaltask.size=800; +set hive.auto.convert.join.noconditionaltask.size=400; EXPLAIN INSERT OVERWRITE TABLE test_table3_n3 PARTITION (ds = '1') SELECT a.key, a.key2, concat(a.value, b.value) http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/queries/clientpositive/join32_lessSize.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/join32_lessSize.q b/ql/src/test/queries/clientpositive/join32_lessSize.q index 229ba56..fcadbe3 100644 --- a/ql/src/test/queries/clientpositive/join32_lessSize.q +++ b/ql/src/test/queries/clientpositive/join32_lessSize.q @@ -9,7 +9,7 @@ CREATE TABLE dest_j2_n1(key STRING, value STRING, val2 STRING) STORED AS TEXTFIL set hive.auto.convert.join=true; set hive.auto.convert.join.noconditionaltask=true; -set hive.auto.convert.join.noconditionaltask.size=6000; +set hive.auto.convert.join.noconditionaltask.size=4000; -- Since the inputs are small, it should be automatically converted to mapjoin http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/queries/clientpositive/tez_smb_main.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/tez_smb_main.q b/ql/src/test/queries/clientpositive/tez_smb_main.q index db8daa3..c7516b8 100644 --- a/ql/src/test/queries/clientpositive/tez_smb_main.q +++ b/ql/src/test/queries/clientpositive/tez_smb_main.q @@ -70,7 +70,7 @@ select count(*) from tab_n11 a join tab_part_n12 b on a.key = b.key; -set hive.auto.convert.join.noconditionaltask.size=2000; +set hive.auto.convert.join.noconditionaltask.size=1000; set hive.mapjoin.hybridgrace.minwbsize=125; set hive.mapjoin.hybridgrace.minnumpartitions=4; set hive.llap.memory.oversubscription.max.executors.per.query=0; @@ -111,7 +111,7 @@ UNION ALL select s2.key as key, s2.value as value from tab_n11 s2 ) a join tab_part_n12 b on (a.key = b.key); -set hive.auto.convert.join.noconditionaltask.size=10000; +set hive.auto.convert.join.noconditionaltask.size=5000; explain select count(*) from http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/queries/clientpositive/unionDistinct_1.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/unionDistinct_1.q b/ql/src/test/queries/clientpositive/unionDistinct_1.q index f966f42..75c66b0 100644 --- a/ql/src/test/queries/clientpositive/unionDistinct_1.q +++ b/ql/src/test/queries/clientpositive/unionDistinct_1.q @@ -158,7 +158,7 @@ set hive.merge.mapfiles=false; set hive.auto.convert.join=true; set hive.auto.convert.join.noconditionaltask=true; -set hive.auto.convert.join.noconditionaltask.size=15000; +set hive.auto.convert.join.noconditionaltask.size=8000; -- Since the inputs are small, it should be automatically converted to mapjoin http://git-wip-us.apache.org/repos/asf/hive/blob/fb7a676b/ql/src/test/results/clientpositive/llap/orc_llap.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/orc_llap.q.out b/ql/src/test/results/clientpositive/llap/orc_llap.q.out index a639b68..f4f8278 100644 --- a/ql/src/test/results/clientpositive/llap/orc_llap.q.out +++ b/ql/src/test/results/clientpositive/llap/orc_llap.q.out @@ -1021,8 +1021,8 @@ STAGE PLANS: Tez #### A masked pattern was here #### Edges: - Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) - Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Map 2 <- Map 1 (BROADCAST_EDGE) + Reducer 3 <- Map 2 (CUSTOM_SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 @@ -1046,7 +1046,7 @@ STAGE PLANS: value expressions: _col2 (type: string) Execution mode: vectorized, llap LLAP IO: all inputs - Map 4 + Map 2 Map Operator Tree: TableScan alias: o2 @@ -1059,38 +1059,31 @@ STAGE PLANS: expressions: csmallint (type: smallint), cstring2 (type: string) outputColumnNames: _col0, _col2 Statistics: Num rows: 136968 Data size: 11042828 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col0 (type: smallint) - sort order: + - Map-reduce partition columns: _col0 (type: smallint) - Statistics: Num rows: 136968 Data size: 11042828 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col2 (type: string) + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: smallint) + 1 _col0 (type: smallint) + outputColumnNames: _col2, _col5 + input vertices: + 0 Map 1 + Statistics: Num rows: 636522 Data size: 114343414 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: hash(_col2,_col5) (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 636522 Data size: 114343414 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: sum(_col0) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) Execution mode: vectorized, llap LLAP IO: all inputs - Reducer 2 - Execution mode: llap - Reduce Operator Tree: - Merge Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 _col0 (type: smallint) - 1 _col0 (type: smallint) - outputColumnNames: _col2, _col5 - Statistics: Num rows: 636522 Data size: 114343414 Basic stats: COMPLETE Column stats: COMPLETE - Select Operator - expressions: hash(_col2,_col5) (type: int) - outputColumnNames: _col0 - Statistics: Num rows: 636522 Data size: 114343414 Basic stats: COMPLETE Column stats: COMPLETE - Group By Operator - aggregations: sum(_col0) - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - sort order: - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col0 (type: bigint) Reducer 3 Execution mode: vectorized, llap Reduce Operator Tree: