HIVE-18361: Extend shared work optimizer to reuse computation beyond work boundaries (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/96a409e1 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/96a409e1 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/96a409e1 Branch: refs/heads/master Commit: 96a409e1c6ec846eeb6c72b50bed60790ccc1836 Parents: 3f5148d Author: Jesus Camacho Rodriguez <jcama...@apache.org> Authored: Thu Dec 21 17:08:07 2017 -0800 Committer: Jesus Camacho Rodriguez <jcama...@apache.org> Committed: Thu Jan 4 18:44:15 2018 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 + .../test/resources/testconfiguration.properties | 1 + .../hive/ql/optimizer/SharedWorkOptimizer.java | 407 ++++++- .../test/queries/clientpositive/sharedworkext.q | 53 + .../clientpositive/llap/explainuser_1.q.out | 47 +- .../clientpositive/llap/sharedworkext.q.out | 1076 ++++++++++++++++++ .../clientpositive/llap/subquery_multi.q.out | 185 +-- .../clientpositive/llap/subquery_notin.q.out | 752 +++++------- .../clientpositive/llap/subquery_scalar.q.out | 167 +-- .../clientpositive/llap/subquery_select.q.out | 110 +- .../clientpositive/llap/subquery_views.q.out | 83 +- .../llap/vector_groupby_grouping_id2.q.out | 138 +-- .../llap/vector_groupby_grouping_sets4.q.out | 71 +- .../clientpositive/perf/tez/query14.q.out | 264 ++--- .../clientpositive/perf/tez/query2.q.out | 103 +- .../clientpositive/perf/tez/query23.q.out | 360 ++---- .../clientpositive/perf/tez/query32.q.out | 30 +- .../clientpositive/perf/tez/query33.q.out | 232 ++-- .../clientpositive/perf/tez/query44.q.out | 69 +- .../clientpositive/perf/tez/query47.q.out | 224 ++-- .../clientpositive/perf/tez/query54.q.out | 34 +- .../clientpositive/perf/tez/query56.q.out | 232 ++-- .../clientpositive/perf/tez/query57.q.out | 224 ++-- .../clientpositive/perf/tez/query58.q.out | 200 +--- .../clientpositive/perf/tez/query59.q.out | 52 +- .../clientpositive/perf/tez/query60.q.out | 232 ++-- .../clientpositive/perf/tez/query61.q.out | 118 +- .../clientpositive/perf/tez/query64.q.out | 468 +++----- .../clientpositive/perf/tez/query70.q.out | 66 +- .../clientpositive/perf/tez/query83.q.out | 116 +- .../clientpositive/perf/tez/query90.q.out | 50 +- .../clientpositive/perf/tez/query92.q.out | 30 +- 32 files changed, 3056 insertions(+), 3142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 841d075..6529da6 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1695,6 +1695,10 @@ public class HiveConf extends Configuration { HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true, "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" + "and follow-up operators in the query plan and merges them if they meet some preconditions. Tez only."), + HIVE_SHARED_WORK_EXTENDED_OPTIMIZATION("hive.optimize.shared.work.extended", true, + "Whether to enable shared work extended optimizer. The optimizer tries to merge equal operators\n" + + "after a work boundary after shared work optimizer has been executed. Requires hive.optimize.shared.work\n" + + "to be set to true. Tez only."), HIVE_COMBINE_EQUIVALENT_WORK_OPTIMIZATION("hive.combine.equivalent.work.optimization", true, "Whether to " + "combine equivalent work objects during physical optimization.\n This optimization looks for equivalent " + "work objects and combines them if they meet certain preconditions. Spark only."), http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 98e390c..ac81995 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -628,6 +628,7 @@ minillaplocal.query.files=\ semijoin6.q,\ semijoin7.q,\ semijoin_hint.q,\ + sharedworkext.q,\ smb_cache.q,\ special_character_in_tabnames_1.q,\ sqlmerge.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java index d4ddb75..08fec42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hive.ql.optimizer; import java.util.ArrayList; @@ -36,6 +35,7 @@ import java.util.Set; import java.util.TreeMap; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; @@ -79,6 +79,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.collect.Multiset; +import com.google.common.collect.Sets; import com.google.common.collect.TreeMultiset; /** @@ -102,8 +103,8 @@ import com.google.common.collect.TreeMultiset; * | | / \ * Op Op Op Op * - * <p>A limitation in the current implementation is that the optimizer does not - * go beyond a work boundary. + * <p>If the extended version of the optimizer is enabled, it can go beyond + * a work boundary to find reutilization opportunities. * * <p>The optimization only works with the Tez execution engine. */ @@ -146,13 +147,13 @@ public class SharedWorkOptimizer extends Transform { String tableName = tablePair.getKey(); for (TableScanOperator discardableTsOp : tableNameToOps.get(tableName)) { if (removedOps.contains(discardableTsOp)) { - LOG.debug("Skip {} as it has been already removed", discardableTsOp); + LOG.debug("Skip {} as it has already been removed", discardableTsOp); continue; } Collection<TableScanOperator> prevTsOps = existingOps.get(tableName); for (TableScanOperator retainableTsOp : prevTsOps) { if (removedOps.contains(retainableTsOp)) { - LOG.debug("Skip {} as it has been already removed", retainableTsOp); + LOG.debug("Skip {} as it has already been removed", retainableTsOp); continue; } @@ -167,7 +168,7 @@ public class SharedWorkOptimizer extends Transform { // Secondly, we extract information about the part of the tree that can be merged // as well as some structural information (memory consumption) that needs to be // used to determined whether the merge can happen - SharedResult sr = extractSharedOptimizationInfo( + SharedResult sr = extractSharedOptimizationInfoForRoot( pctx, optimizerCache, retainableTsOp, discardableTsOp); // It seems these two operators can be merged. @@ -197,7 +198,8 @@ public class SharedWorkOptimizer extends Transform { } } - LOG.debug("Merging subtree starting at {} into subtree starting at {}", discardableTsOp, retainableTsOp); + LOG.debug("Merging subtree starting at {} into subtree starting at {}", + discardableTsOp, retainableTsOp); } else { // Only TS operator ExprNodeGenericFuncDesc exprNode = null; @@ -262,6 +264,7 @@ public class SharedWorkOptimizer extends Transform { !sr.discardableInputOps.contains(sjbi.getTsOp())) { GenTezUtils.removeSemiJoinOperator( pctx, (ReduceSinkOperator) op, sjbi.getTsOp()); + optimizerCache.tableScanToDPPSource.remove(sjbi.getTsOp(), op); } } else if (op instanceof AppMasterEventOperator) { DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf(); @@ -269,6 +272,7 @@ public class SharedWorkOptimizer extends Transform { !sr.discardableInputOps.contains(dped.getTableScan())) { GenTezUtils.removeSemiJoinOperator( pctx, (AppMasterEventOperator) op, dped.getTableScan()); + optimizerCache.tableScanToDPPSource.remove(dped.getTableScan(), op); } } LOG.debug("Input operator removed: {}", op); @@ -292,10 +296,12 @@ public class SharedWorkOptimizer extends Transform { GenTezUtils.removeSemiJoinOperator(pctx, (ReduceSinkOperator) dppSource, (TableScanOperator) sr.retainableOps.get(0)); + optimizerCache.tableScanToDPPSource.remove(sr.retainableOps.get(0), op); } else if (dppSource instanceof AppMasterEventOperator) { GenTezUtils.removeSemiJoinOperator(pctx, (AppMasterEventOperator) dppSource, (TableScanOperator) sr.retainableOps.get(0)); + optimizerCache.tableScanToDPPSource.remove(sr.retainableOps.get(0), op); } } } @@ -328,6 +334,203 @@ public class SharedWorkOptimizer extends Transform { LOG.debug("After SharedWorkOptimizer:\n" + Operator.toString(pctx.getTopOps().values())); } + if(pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_EXTENDED_OPTIMIZATION)) { + // Gather RS operators that 1) belong to root works, i.e., works containing TS operators, + // and 2) share the same input operator. + // These will be the first target for extended shared work optimization + Multimap<Operator<?>, ReduceSinkOperator> parentToRsOps = ArrayListMultimap.create(); + Set<Operator<?>> visited = new HashSet<>(); + for (Entry<String, TableScanOperator> e : topOps.entrySet()) { + gatherReduceSinkOpsByInput(parentToRsOps, visited, + findWorkOperators(optimizerCache, e.getValue())); + } + + while (!parentToRsOps.isEmpty()) { + // As above, we enforce a certain order when we do the reutilization. + // In particular, we use size of data in RS x number of uses. + List<Entry<Operator<?>, Long>> sortedRSGroups = + rankOpsByAccumulatedSize(parentToRsOps.keySet()); + LOG.debug("Sorted operators by size: {}", sortedRSGroups); + + // Execute extended optimization + // For each RS, check whether other RS in same work could be merge into this one. + // If they are merged, RS operators in the resulting work will be considered + // mergeable in next loop iteration. + Multimap<Operator<?>, ReduceSinkOperator> existingRsOps = ArrayListMultimap.create(); + for (Entry<Operator<?>, Long> rsGroupInfo : sortedRSGroups) { + Operator<?> rsParent = rsGroupInfo.getKey(); + for (ReduceSinkOperator discardableRsOp : parentToRsOps.get(rsParent)) { + if (removedOps.contains(discardableRsOp)) { + LOG.debug("Skip {} as it has already been removed", discardableRsOp); + continue; + } + Collection<ReduceSinkOperator> otherRsOps = existingRsOps.get(rsParent); + for (ReduceSinkOperator retainableRsOp : otherRsOps) { + if (removedOps.contains(retainableRsOp)) { + LOG.debug("Skip {} as it has already been removed", retainableRsOp); + continue; + } + + // First we quickly check if the two RS operators can actually be merged. + // We already know that these two RS operators have the same parent, but + // we need to check whether both RS are actually equal. Further, we check + // whether their child is also equal. If any of these conditions are not + // met, we are not going to try to merge. + boolean mergeable = compareOperator(pctx, retainableRsOp, discardableRsOp) && + compareOperator(pctx, retainableRsOp.getChildOperators().get(0), + discardableRsOp.getChildOperators().get(0)); + if (!mergeable) { + // Skip + LOG.debug("{} and {} cannot be merged", retainableRsOp, discardableRsOp); + continue; + } + + LOG.debug("Checking additional conditions for merging subtree starting at {}" + + " into subtree starting at {}", discardableRsOp, retainableRsOp); + + // Secondly, we extract information about the part of the tree that can be merged + // as well as some structural information (memory consumption) that needs to be + // used to determined whether the merge can happen + Operator<?> retainableRsOpChild = retainableRsOp.getChildOperators().get(0); + Operator<?> discardableRsOpChild = discardableRsOp.getChildOperators().get(0); + SharedResult sr = extractSharedOptimizationInfo( + pctx, optimizerCache, retainableRsOp, discardableRsOp, + retainableRsOpChild, discardableRsOpChild); + + // It seems these two operators can be merged. + // Check that plan meets some preconditions before doing it. + // In particular, in the presence of map joins in the upstream plan: + // - we cannot exceed the noconditional task size, and + // - if we already merged the big table, we cannot merge the broadcast + // tables. + if (sr.retainableOps.isEmpty() || !validPreConditions(pctx, optimizerCache, sr)) { + // Skip + LOG.debug("{} and {} do not meet preconditions", retainableRsOp, discardableRsOp); + continue; + } + + // We can merge + Operator<?> lastRetainableOp = sr.retainableOps.get(sr.retainableOps.size() - 1); + Operator<?> lastDiscardableOp = sr.discardableOps.get(sr.discardableOps.size() - 1); + if (lastDiscardableOp.getNumChild() != 0) { + List<Operator<? extends OperatorDesc>> allChildren = + Lists.newArrayList(lastDiscardableOp.getChildOperators()); + for (Operator<? extends OperatorDesc> op : allChildren) { + lastDiscardableOp.getChildOperators().remove(op); + op.replaceParent(lastDiscardableOp, lastRetainableOp); + lastRetainableOp.getChildOperators().add(op); + } + } + + LOG.debug("Merging subtree starting at {} into subtree starting at {}", + discardableRsOp, retainableRsOp); + + // First we remove the input operators of the expression that + // we are going to eliminate + for (Operator<?> op : sr.discardableInputOps) { + OperatorUtils.removeOperator(op); + optimizerCache.removeOp(op); + removedOps.add(op); + // Remove DPP predicates + if (op instanceof ReduceSinkOperator) { + SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op); + if (sjbi != null && !sr.discardableOps.contains(sjbi.getTsOp()) && + !sr.discardableInputOps.contains(sjbi.getTsOp())) { + GenTezUtils.removeSemiJoinOperator( + pctx, (ReduceSinkOperator) op, sjbi.getTsOp()); + optimizerCache.tableScanToDPPSource.remove(sjbi.getTsOp(), op); + } + } else if (op instanceof AppMasterEventOperator) { + DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf(); + if (!sr.discardableOps.contains(dped.getTableScan()) && + !sr.discardableInputOps.contains(dped.getTableScan())) { + GenTezUtils.removeSemiJoinOperator( + pctx, (AppMasterEventOperator) op, dped.getTableScan()); + optimizerCache.tableScanToDPPSource.remove(dped.getTableScan(), op); + } + } + LOG.debug("Input operator removed: {}", op); + } + // We remove the discardable RS operator + OperatorUtils.removeOperator(discardableRsOp); + optimizerCache.removeOp(discardableRsOp); + removedOps.add(discardableRsOp); + LOG.debug("Operator removed: {}", discardableRsOp); + // Then we merge the operators of the works we are going to merge + optimizerCache.removeOpAndCombineWork(discardableRsOpChild, retainableRsOpChild); + // Finally we remove the rest of the expression from the tree + for (Operator<?> op : sr.discardableOps) { + OperatorUtils.removeOperator(op); + optimizerCache.removeOp(op); + removedOps.add(op); + LOG.debug("Operator removed: {}", op); + } + + break; + } + + if (removedOps.contains(discardableRsOp)) { + // This operator has been removed, remove it from the list of existing operators + existingRsOps.remove(rsParent, discardableRsOp); + } else { + // This operator has not been removed, include it in the list of existing operators + existingRsOps.put(rsParent, discardableRsOp); + } + } + } + + // We gather the operators that will be used for next iteration of extended optimization + // (if any) + parentToRsOps = ArrayListMultimap.create(); + visited = new HashSet<>(); + for (Entry<Operator<?>, ReduceSinkOperator> e : existingRsOps.entries()) { + if (removedOps.contains(e.getValue()) || e.getValue().getNumChild() < 1) { + // If 1) RS has been removed, or 2) it does not have a child (for instance, it is a + // semijoin RS), we can quickly skip this one + continue; + } + gatherReduceSinkOpsByInput(parentToRsOps, visited, + findWorkOperators(optimizerCache, e.getValue().getChildOperators().get(0))); + } + } + + // Remove unused table scan operators + it = topOps.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, TableScanOperator> e = it.next(); + if (e.getValue().getNumChild() == 0) { + it.remove(); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("After SharedWorkExtendedOptimizer:\n" + + Operator.toString(pctx.getTopOps().values())); + } + } + + // If we are running tests, we are going to verify that the contents of the cache + // correspond with the contents of the plan, and otherwise we fail. + // This check always run when we are running in test mode, independently on whether + // we use the basic or the extended version of the optimizer. + if (pctx.getConf().getBoolVar(ConfVars.HIVE_IN_TEST)) { + Set<Operator<?>> visited = new HashSet<>(); + it = topOps.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, TableScanOperator> e = it.next(); + for (Operator<?> op : OperatorUtils.findOperators(e.getValue(), Operator.class)) { + if (!visited.contains(op)) { + if (!findWorkOperators(optimizerCache, op).equals( + findWorkOperators(op, new HashSet<Operator<?>>()))) { + throw new SemanticException("Error in shared work optimizer: operator cache contents" + + "and actual plan differ"); + } + visited.add(op); + } + } + } + } + return pctx; } @@ -392,16 +595,59 @@ public class SharedWorkOptimizer extends Transform { } } List<Entry<String, Long>> sortedTables = - new LinkedList<>(tableToTotalSize.entrySet()); + new LinkedList<>(tableToTotalSize.entrySet()); Collections.sort(sortedTables, Collections.reverseOrder( - new Comparator<Map.Entry<String, Long>>() { - public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) { - return (o1.getValue()).compareTo(o2.getValue()); - } - })); + new Comparator<Map.Entry<String, Long>>() { + public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) { + return (o1.getValue()).compareTo(o2.getValue()); + } + })); return sortedTables; } + private static void gatherReduceSinkOpsByInput(Multimap<Operator<?>, + ReduceSinkOperator> parentToRsOps, Set<Operator<?>> visited, Set<Operator<?>> ops) { + for (Operator<?> op : ops) { + // If the RS has other RS siblings, we will add it to be considered in next iteration + if (op instanceof ReduceSinkOperator && !visited.contains(op)) { + Operator<?> parent = op.getParentOperators().get(0); + Set<ReduceSinkOperator> s = new LinkedHashSet<>(); + for (Operator<?> c : parent.getChildOperators()) { + if (c instanceof ReduceSinkOperator) { + s.add((ReduceSinkOperator) c); + visited.add(c); + } + } + if (s.size() > 1) { + parentToRsOps.putAll(parent, s); + } + } + } + } + + private static List<Entry<Operator<?>, Long>> rankOpsByAccumulatedSize(Set<Operator<?>> opsSet) { + Map<Operator<?>, Long> opToTotalSize = new HashMap<>(); + for (Operator<?> op : opsSet) { + long size = op.getStatistics() != null ? + op.getStatistics().getDataSize() : 0L; + opToTotalSize.put(op, + StatsUtils.safeMult(op.getChildOperators().size(), size)); + } + List<Entry<Operator<?>, Long>> sortedOps = + new LinkedList<>(opToTotalSize.entrySet()); + Collections.sort(sortedOps, Collections.reverseOrder( + new Comparator<Map.Entry<Operator<?>, Long>>() { + public int compare(Map.Entry<Operator<?>, Long> o1, Map.Entry<Operator<?>, Long> o2) { + int valCmp = o1.getValue().compareTo(o2.getValue()); + if (valCmp == 0) { + return o1.getKey().toString().compareTo(o2.getKey().toString()); + } + return valCmp; + } + })); + return sortedOps; + } + private static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, TableScanOperator tsOp1, TableScanOperator tsOp2) throws SemanticException { // First we check if the two table scan operators can actually be merged @@ -486,15 +732,15 @@ public class SharedWorkOptimizer extends Transform { return true; } - private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, + private static SharedResult extractSharedOptimizationInfoForRoot(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, TableScanOperator retainableTsOp, TableScanOperator discardableTsOp) throws SemanticException { - Set<Operator<?>> retainableOps = new LinkedHashSet<>(); - Set<Operator<?>> discardableOps = new LinkedHashSet<>(); + LinkedHashSet<Operator<?>> retainableOps = new LinkedHashSet<>(); + LinkedHashSet<Operator<?>> discardableOps = new LinkedHashSet<>(); Set<Operator<?>> discardableInputOps = new HashSet<>(); - long dataSize = 0l; - long maxDataSize = 0l; + long dataSize = 0L; + long maxDataSize = 0L; retainableOps.add(retainableTsOp); discardableOps.add(discardableTsOp); @@ -503,7 +749,8 @@ public class SharedWorkOptimizer extends Transform { if (equalOp1.getNumChild() > 1 || equalOp2.getNumChild() > 1) { // TODO: Support checking multiple child operators to merge further. discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableOps)); - return new SharedResult(retainableOps, discardableOps, discardableInputOps, dataSize, maxDataSize); + return new SharedResult(retainableOps, discardableOps, discardableInputOps, + dataSize, maxDataSize); } Operator<?> currentOp1 = retainableTsOp.getChildOperators().get(0); Operator<?> currentOp2 = discardableTsOp.getChildOperators().get(0); @@ -532,19 +779,54 @@ public class SharedWorkOptimizer extends Transform { currentOp2.getChildOperators().size() > 1) { // TODO: Support checking multiple child operators to merge further. discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableOps)); - discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps, discardableInputOps)); - return new SharedResult(retainableOps, discardableOps, discardableInputOps, dataSize, maxDataSize); + discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps, + discardableInputOps)); + return new SharedResult(retainableOps, discardableOps, discardableInputOps, + dataSize, maxDataSize); } currentOp1 = currentOp1.getChildOperators().get(0); currentOp2 = currentOp2.getChildOperators().get(0); } else { // Bail out discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableOps)); - discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps, discardableInputOps)); - return new SharedResult(retainableOps, discardableOps, discardableInputOps, dataSize, maxDataSize); + discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps, + discardableInputOps)); + return new SharedResult(retainableOps, discardableOps, discardableInputOps, + dataSize, maxDataSize); } } + return extractSharedOptimizationInfo(pctx, optimizerCache, equalOp1, equalOp2, + currentOp1, currentOp2, retainableOps, discardableOps, discardableInputOps, false); + } + + private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, + SharedWorkOptimizerCache optimizerCache, + Operator<?> retainableOpEqualParent, + Operator<?> discardableOpEqualParent, + Operator<?> retainableOp, + Operator<?> discardableOp) throws SemanticException { + return extractSharedOptimizationInfo(pctx, optimizerCache, + retainableOpEqualParent, discardableOpEqualParent, retainableOp, discardableOp, + new LinkedHashSet<>(), new LinkedHashSet<>(), new HashSet<>(), true); + } + + private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, + SharedWorkOptimizerCache optimizerCache, + Operator<?> retainableOpEqualParent, + Operator<?> discardableOpEqualParent, + Operator<?> retainableOp, + Operator<?> discardableOp, + LinkedHashSet<Operator<?>> retainableOps, + LinkedHashSet<Operator<?>> discardableOps, + Set<Operator<?>> discardableInputOps, + boolean removeInputBranch) throws SemanticException { + Operator<?> equalOp1 = retainableOpEqualParent; + Operator<?> equalOp2 = discardableOpEqualParent; + Operator<?> currentOp1 = retainableOp; + Operator<?> currentOp2 = discardableOp; + long dataSize = 0L; + long maxDataSize = 0L; // Try to merge rest of operators while (!(currentOp1 instanceof ReduceSinkOperator)) { // Check whether current operators are equal @@ -563,7 +845,7 @@ public class SharedWorkOptimizer extends Transform { for (; idx < currentOp1.getParentOperators().size(); idx++) { Operator<?> parentOp1 = currentOp1.getParentOperators().get(idx); Operator<?> parentOp2 = currentOp2.getParentOperators().get(idx); - if (parentOp1 == equalOp1 && parentOp2 == equalOp2) { + if (parentOp1 == equalOp1 && parentOp2 == equalOp2 && !removeInputBranch) { continue; } if ((parentOp1 == equalOp1 && parentOp2 != equalOp2) || @@ -572,7 +854,8 @@ public class SharedWorkOptimizer extends Transform { break; } // Compare input - List<Operator<?>> removeOpsForCurrentInput = compareAndGatherOps(pctx, parentOp1, parentOp2); + List<Operator<?>> removeOpsForCurrentInput = + compareAndGatherOps(pctx, parentOp1, parentOp2); if (removeOpsForCurrentInput == null) { // Inputs are not the same, bail out break; @@ -626,8 +909,10 @@ public class SharedWorkOptimizer extends Transform { discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableInputOps)); discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableOps)); - discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps, discardableInputOps)); - return new SharedResult(retainableOps, discardableOps, discardableInputOps, dataSize, maxDataSize); + discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps, + discardableInputOps)); + return new SharedResult(retainableOps, discardableOps, discardableInputOps, + dataSize, maxDataSize); } private static Multiset<String> extractConjsIgnoringDPPPreds(ExprNodeDesc predicate) { @@ -656,7 +941,8 @@ public class SharedWorkOptimizer extends Transform { Set<Operator<?>> dppBranches = new HashSet<>(); for (Operator<?> op : ops) { if (op instanceof TableScanOperator) { - Collection<Operator<?>> c = optimizerCache.tableScanToDPPSource.get((TableScanOperator) op); + Collection<Operator<?>> c = optimizerCache.tableScanToDPPSource + .get((TableScanOperator) op); for (Operator<?> dppSource : c) { // Remove the branches Operator<?> currentOp = dppSource; @@ -676,7 +962,8 @@ public class SharedWorkOptimizer extends Transform { Set<Operator<?>> dppBranches = new HashSet<>(); for (Operator<?> op : ops) { if (op instanceof TableScanOperator) { - Collection<Operator<?>> c = optimizerCache.tableScanToDPPSource.get((TableScanOperator) op); + Collection<Operator<?>> c = optimizerCache.tableScanToDPPSource + .get((TableScanOperator) op); for (Operator<?> dppSource : c) { Set<Operator<?>> ascendants = findAscendantWorkOperators(pctx, optimizerCache, dppSource); @@ -711,6 +998,11 @@ public class SharedWorkOptimizer extends Transform { return false; } + if (gather && op2.getChildOperators().size() > 1) { + // If the second operator has more than one child, we stop gathering + gather = false; + } + if (gather) { result.add(op2); } @@ -724,12 +1016,8 @@ public class SharedWorkOptimizer extends Transform { for (int i = 0; i < op1ParentOperators.size(); i++) { Operator<?> op1ParentOp = op1ParentOperators.get(i); Operator<?> op2ParentOp = op2ParentOperators.get(i); - boolean mergeable; - if (gather && op2ParentOp.getChildOperators().size() < 2) { - mergeable = compareAndGatherOps(pctx, op1ParentOp, op2ParentOp, result, true); - } else { - mergeable = compareAndGatherOps(pctx, op1ParentOp, op2ParentOp, result, false); - } + boolean mergeable = + compareAndGatherOps(pctx, op1ParentOp, op2ParentOp, result, gather); if (!mergeable) { return false; } @@ -741,7 +1029,6 @@ public class SharedWorkOptimizer extends Transform { return true; } - @SuppressWarnings({ "rawtypes", "unchecked" }) private static boolean compareOperator(ParseContext pctx, Operator<?> op1, Operator<?> op2) throws SemanticException { if (!op1.getClass().getName().equals(op2.getClass().getName())) { @@ -809,21 +1096,21 @@ public class SharedWorkOptimizer extends Transform { return false; } - TableScanOperator tsOp1 = (TableScanOperator) sr.retainableOps.get(0); - TableScanOperator tsOp2 = (TableScanOperator) sr.discardableOps.get(0); + Operator<?> op1 = sr.retainableOps.get(0); + Operator<?> op2 = sr.discardableOps.get(0); - // 1) The set of operators in the works of the TS operators need to meet + // 1) The set of operators in the works that we are merging need to meet // some requirements. In particular: - // 1.1. None of the works that contain the TS operators can contain a Union + // 1.1. None of the works that we are merging can contain a Union // operator. This is not supported yet as we might end up with cycles in // the Tez DAG. // 1.2. There cannot be more than one DummyStore operator in the new resulting - // work when the TS operators are merged. This is due to an assumption in + // work when the operators are merged. This is due to an assumption in // MergeJoinProc that needs to be further explored. // If any of these conditions are not met, we cannot merge. // TODO: Extend rule so it can be applied for these cases. - final Set<Operator<?>> workOps1 = findWorkOperators(optimizerCache, tsOp1); - final Set<Operator<?>> workOps2 = findWorkOperators(optimizerCache, tsOp2); + final Set<Operator<?>> workOps1 = findWorkOperators(optimizerCache, op1); + final Set<Operator<?>> workOps2 = findWorkOperators(optimizerCache, op2); boolean foundDummyStoreOp = false; for (Operator<?> op : workOps1) { if (op instanceof UnionOperator) { @@ -853,8 +1140,8 @@ public class SharedWorkOptimizer extends Transform { // If we do, we cannot merge. The reason is that Tez currently does // not support parallel edges, i.e., multiple edges from same work x // into same work y. - final Set<Operator<?>> outputWorksOps1 = findChildWorkOperators(pctx, optimizerCache, tsOp1); - final Set<Operator<?>> outputWorksOps2 = findChildWorkOperators(pctx, optimizerCache, tsOp2); + final Set<Operator<?>> outputWorksOps1 = findChildWorkOperators(pctx, optimizerCache, op1); + final Set<Operator<?>> outputWorksOps2 = findChildWorkOperators(pctx, optimizerCache, op2); if (!Collections.disjoint(outputWorksOps1, outputWorksOps2)) { // We cannot merge return false; @@ -866,10 +1153,19 @@ public class SharedWorkOptimizer extends Transform { // Work2 Work3 Work2 // // If we do, we cannot merge. The reason is the same as above, currently - // Tez currently does not support parallel edges. - final Set<Operator<?>> inputWorksOps1 = findParentWorkOperators(pctx, optimizerCache, tsOp1); + // Tez does not support parallel edges. + // + // In the check, we exclude the inputs to the root operator that we are trying + // to merge (only useful for extended merging as TS do not have inputs). + final Set<Operator<?>> excludeOps1 = sr.retainableOps.get(0).getNumParent() > 0 ? + ImmutableSet.copyOf(sr.retainableOps.get(0).getParentOperators()) : ImmutableSet.of(); + final Set<Operator<?>> inputWorksOps1 = + findParentWorkOperators(pctx, optimizerCache, op1, excludeOps1); + final Set<Operator<?>> excludeOps2 = sr.discardableOps.get(0).getNumParent() > 0 ? + Sets.union(ImmutableSet.copyOf(sr.discardableOps.get(0).getParentOperators()), sr.discardableInputOps) : + sr.discardableInputOps; final Set<Operator<?>> inputWorksOps2 = - findParentWorkOperators(pctx, optimizerCache, tsOp2, sr.discardableInputOps); + findParentWorkOperators(pctx, optimizerCache, op2, excludeOps2); if (!Collections.disjoint(inputWorksOps1, inputWorksOps2)) { // We cannot merge return false; @@ -885,9 +1181,9 @@ public class SharedWorkOptimizer extends Transform { // // If we do, we cannot merge, as we would end up with a cycle in the DAG. final Set<Operator<?>> descendantWorksOps1 = - findDescendantWorkOperators(pctx, optimizerCache, tsOp1, sr.discardableInputOps); + findDescendantWorkOperators(pctx, optimizerCache, op1, sr.discardableInputOps); final Set<Operator<?>> descendantWorksOps2 = - findDescendantWorkOperators(pctx, optimizerCache, tsOp2, sr.discardableInputOps); + findDescendantWorkOperators(pctx, optimizerCache, op2, sr.discardableInputOps); if (!Collections.disjoint(descendantWorksOps1, workOps2) || !Collections.disjoint(workOps1, descendantWorksOps2)) { return false; @@ -1120,6 +1416,12 @@ public class SharedWorkOptimizer extends Transform { this.dataSize = dataSize; this.maxDataSize = maxDataSize; } + + @Override + public String toString() { + return "SharedResult { " + this.retainableOps + "; " + this.discardableOps + "; " + + this.discardableInputOps + "};"; + } } /** Cache to accelerate optimization */ @@ -1173,6 +1475,11 @@ public class SharedWorkOptimizer extends Transform { } } } + + @Override + public String toString() { + return "SharedWorkOptimizerCache { \n" + operatorToWorkOperators.toString() + "\n };"; + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/ql/src/test/queries/clientpositive/sharedworkext.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/sharedworkext.q b/ql/src/test/queries/clientpositive/sharedworkext.q new file mode 100644 index 0000000..b1801ea --- /dev/null +++ b/ql/src/test/queries/clientpositive/sharedworkext.q @@ -0,0 +1,53 @@ +EXPLAIN +SELECT a.key FROM +( + SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value +) a +JOIN +( + SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value +) b +ON a.key = b.key; + +SELECT a.key FROM +( + SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value +) a +JOIN +( + SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value +) b +ON a.key = b.key; + +EXPLAIN +SELECT a.key FROM +( + SELECT rank() OVER (ORDER BY key) AS key FROM + (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value) a +) a +JOIN +( + SELECT rank() OVER (ORDER BY key) AS key FROM + (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value) b +) b +ON a.key = b.key; + +SELECT a.key FROM +( + SELECT rank() OVER (ORDER BY key) AS key FROM + (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value) a +) a +JOIN +( + SELECT rank() OVER (ORDER BY key) AS key FROM + (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value) b +) b +ON a.key = b.key; http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/ql/src/test/results/clientpositive/llap/explainuser_1.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/explainuser_1.q.out b/ql/src/test/results/clientpositive/llap/explainuser_1.q.out index 8ab5e3a..2fb1854 100644 --- a/ql/src/test/results/clientpositive/llap/explainuser_1.q.out +++ b/ql/src/test/results/clientpositive/llap/explainuser_1.q.out @@ -2363,10 +2363,9 @@ Plan optimized by CBO. Vertex dependency in root stage Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 6 (CUSTOM_SIMPLE_EDGE) -Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE) +Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE) Reducer 4 <- Reducer 3 (SIMPLE_EDGE) Reducer 6 <- Map 5 (CUSTOM_SIMPLE_EDGE) -Reducer 7 <- Map 5 (CUSTOM_SIMPLE_EDGE) Stage-0 Fetch Operator @@ -2384,41 +2383,37 @@ Stage-0 predicate:((_col2 = 0) or (_col5 is null and _col1 is not null and (_col3 >= _col2))) Merge Join Operator [MERGEJOIN_37] (rows=26 width=141) Conds:RS_24.UDFToDouble(_col1)=RS_25._col0(Left Outer),Output:["_col0","_col1","_col2","_col3","_col5"] + <-Reducer 6 [SIMPLE_EDGE] llap + SHUFFLE [RS_25] + PartitionCols:_col0 + Select Operator [SEL_20] (rows=1 width=12) + Output:["_col0","_col1"] + Group By Operator [GBY_7] (rows=1 width=8) + Output:["_col0"],aggregations:["avg(VALUE._col0)"] + <-Map 5 [CUSTOM_SIMPLE_EDGE] llap + PARTITION_ONLY_SHUFFLE [RS_6] + Group By Operator [GBY_5] (rows=1 width=76) + Output:["_col0"],aggregations:["avg(p_size)"] + Filter Operator [FIL_33] (rows=8 width=4) + predicate:(p_size < 10) + TableScan [TS_2] (rows=26 width=4) + default@part,part,Tbl:COMPLETE,Col:COMPLETE,Output:["p_size"] <-Reducer 2 [SIMPLE_EDGE] llap SHUFFLE [RS_24] PartitionCols:UDFToDouble(_col1) Merge Join Operator [MERGEJOIN_36] (rows=26 width=141) Conds:(Inner),Output:["_col0","_col1","_col2","_col3"] + <-Reducer 6 [CUSTOM_SIMPLE_EDGE] llap + SHUFFLE [RS_22] + Group By Operator [GBY_12] (rows=1 width=16) + Output:["_col0","_col1"],aggregations:["count()","count(_col0)"] + Please refer to the previous Group By Operator [GBY_7] <-Map 1 [CUSTOM_SIMPLE_EDGE] llap PARTITION_ONLY_SHUFFLE [RS_21] Select Operator [SEL_1] (rows=26 width=125) Output:["_col0","_col1"] TableScan [TS_0] (rows=26 width=125) default@part,part,Tbl:COMPLETE,Col:COMPLETE,Output:["p_name","p_size"] - <-Reducer 6 [CUSTOM_SIMPLE_EDGE] llap - PARTITION_ONLY_SHUFFLE [RS_22] - Group By Operator [GBY_12] (rows=1 width=16) - Output:["_col0","_col1"],aggregations:["count()","count(_col0)"] - Group By Operator [GBY_7] (rows=1 width=8) - Output:["_col0"],aggregations:["avg(VALUE._col0)"] - <-Map 5 [CUSTOM_SIMPLE_EDGE] llap - PARTITION_ONLY_SHUFFLE [RS_6] - Group By Operator [GBY_5] (rows=1 width=76) - Output:["_col0"],aggregations:["avg(p_size)"] - Filter Operator [FIL_33] (rows=8 width=4) - predicate:(p_size < 10) - TableScan [TS_2] (rows=26 width=4) - default@part,part,Tbl:COMPLETE,Col:COMPLETE,Output:["p_size"] - <-Reducer 7 [SIMPLE_EDGE] llap - SHUFFLE [RS_25] - PartitionCols:_col0 - Select Operator [SEL_20] (rows=1 width=12) - Output:["_col0","_col1"] - Group By Operator [GBY_19] (rows=1 width=8) - Output:["_col0"],aggregations:["avg(VALUE._col0)"] - <-Map 5 [CUSTOM_SIMPLE_EDGE] llap - PARTITION_ONLY_SHUFFLE [RS_18] - Please refer to the previous Group By Operator [GBY_5] PREHOOK: query: explain select b.p_mfgr, min(p_retailprice) from part b http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/ql/src/test/results/clientpositive/llap/sharedworkext.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/sharedworkext.q.out b/ql/src/test/results/clientpositive/llap/sharedworkext.q.out new file mode 100644 index 0000000..e56b1ce --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/sharedworkext.q.out @@ -0,0 +1,1076 @@ +PREHOOK: query: EXPLAIN +SELECT a.key FROM +( + SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value +) a +JOIN +( + SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value +) b +ON a.key = b.key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT a.key FROM +( + SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value +) a +JOIN +( + SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value +) b +ON a.key = b.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE) + Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE) + Reducer 5 <- Reducer 2 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a2 + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string) + Execution mode: llap + LLAP IO: no inputs + Map 6 + Map Operator Tree: + TableScan + alias: a1 + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col1, _col2 + Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + keys: _col2 (type: string), _col1 (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: string), KEY._col1 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE + Reducer 4 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 528 Data size: 45936 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 528 Data size: 45936 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: string), KEY._col1 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT a.key FROM +( + SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value +) a +JOIN +( + SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value +) b +ON a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT a.key FROM +( + SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value +) a +JOIN +( + SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value +) b +ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 +10 +100 +103 +104 +105 +11 +111 +113 +114 +116 +118 +119 +12 +120 +125 +126 +128 +129 +131 +133 +134 +136 +137 +138 +143 +145 +146 +149 +15 +150 +152 +153 +155 +156 +157 +158 +160 +162 +163 +164 +165 +166 +167 +168 +169 +17 +170 +172 +174 +175 +176 +177 +178 +179 +18 +180 +181 +183 +186 +187 +189 +19 +190 +191 +192 +193 +194 +195 +196 +197 +199 +2 +20 +200 +201 +202 +203 +205 +207 +208 +209 +213 +214 +216 +217 +218 +219 +221 +222 +223 +224 +226 +228 +229 +230 +233 +235 +237 +238 +239 +24 +241 +242 +244 +247 +248 +249 +252 +255 +256 +257 +258 +26 +260 +262 +263 +265 +266 +27 +272 +273 +274 +275 +277 +278 +28 +280 +281 +282 +283 +284 +285 +286 +287 +288 +289 +291 +292 +296 +298 +30 +302 +305 +306 +307 +308 +309 +310 +311 +315 +316 +317 +318 +321 +322 +323 +325 +327 +33 +331 +332 +333 +335 +336 +338 +339 +34 +341 +342 +344 +345 +348 +35 +351 +353 +356 +360 +362 +364 +365 +366 +367 +368 +369 +37 +373 +374 +375 +377 +378 +379 +382 +384 +386 +389 +392 +393 +394 +395 +396 +397 +399 +4 +400 +401 +402 +403 +404 +406 +407 +409 +41 +411 +413 +414 +417 +418 +419 +42 +421 +424 +427 +429 +43 +430 +431 +432 +435 +436 +437 +438 +439 +44 +443 +444 +446 +448 +449 +452 +453 +454 +455 +457 +458 +459 +460 +462 +463 +466 +467 +468 +469 +47 +470 +472 +475 +477 +478 +479 +480 +481 +482 +483 +484 +485 +487 +489 +490 +491 +492 +493 +494 +495 +496 +497 +498 +5 +51 +53 +54 +57 +58 +64 +65 +66 +67 +69 +70 +72 +74 +76 +77 +78 +8 +80 +82 +83 +84 +85 +86 +87 +9 +90 +92 +95 +96 +97 +98 +PREHOOK: query: EXPLAIN +SELECT a.key FROM +( + SELECT rank() OVER (ORDER BY key) AS key FROM + (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value) a +) a +JOIN +( + SELECT rank() OVER (ORDER BY key) AS key FROM + (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value) b +) b +ON a.key = b.key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT a.key FROM +( + SELECT rank() OVER (ORDER BY key) AS key FROM + (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value) a +) a +JOIN +( + SELECT rank() OVER (ORDER BY key) AS key FROM + (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value) b +) b +ON a.key = b.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE) + Reducer 4 <- Reducer 3 (SIMPLE_EDGE) + Reducer 5 <- Reducer 4 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE) + Reducer 6 <- Reducer 3 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a2 + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string) + Execution mode: llap + LLAP IO: no inputs + Map 7 + Map Operator Tree: + TableScan + alias: a1 + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col1, _col2 + Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + keys: _col1 (type: string), _col2 (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: string), KEY._col1 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col1 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: 0 (type: int), _col0 (type: string) + sort order: ++ + Map-reduce partition columns: 0 (type: int) + Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: 0 (type: int), _col0 (type: string) + sort order: ++ + Map-reduce partition columns: 0 (type: int) + Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE + Reducer 4 + Execution mode: llap + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey1 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE + PTF Operator + Function definitions: + Input definition + input alias: ptf_0 + output shape: _col0: string + type: WINDOWING + Windowing table definition + input alias: ptf_1 + name: windowingtablefunction + order by: _col0 ASC NULLS FIRST + partition by: 0 + raw input shape: + window functions: + window function definition + alias: rank_window_0 + arguments: _col0 + name: rank + window function: GenericUDAFRankEvaluator + window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX) + isPivotResult: true + Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: rank_window_0 is not null (type: boolean) + Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: rank_window_0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE + Reducer 5 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 6 + Execution mode: llap + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey1 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE + PTF Operator + Function definitions: + Input definition + input alias: ptf_0 + output shape: _col0: string + type: WINDOWING + Windowing table definition + input alias: ptf_1 + name: windowingtablefunction + order by: _col0 ASC NULLS FIRST + partition by: 0 + raw input shape: + window functions: + window function definition + alias: rank_window_0 + arguments: _col0 + name: rank + window function: GenericUDAFRankEvaluator + window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX) + isPivotResult: true + Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: rank_window_0 is not null (type: boolean) + Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: rank_window_0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT a.key FROM +( + SELECT rank() OVER (ORDER BY key) AS key FROM + (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value) a +) a +JOIN +( + SELECT rank() OVER (ORDER BY key) AS key FROM + (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value) b +) b +ON a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT a.key FROM +( + SELECT rank() OVER (ORDER BY key) AS key FROM + (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value) a +) a +JOIN +( + SELECT rank() OVER (ORDER BY key) AS key FROM + (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key) + GROUP BY a1.key, a2.value) b +) b +ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +20 +21 +22 +23 +24 +25 +26 +27 +28 +29 +30 +31 +32 +33 +34 +35 +36 +37 +38 +39 +40 +41 +42 +43 +44 +45 +46 +47 +48 +49 +50 +51 +52 +53 +54 +55 +56 +57 +58 +59 +60 +61 +62 +63 +64 +65 +66 +67 +68 +69 +70 +71 +72 +73 +74 +75 +76 +77 +78 +79 +80 +81 +82 +83 +84 +85 +86 +87 +88 +89 +90 +91 +92 +93 +94 +95 +96 +97 +98 +99 +100 +101 +102 +103 +104 +105 +106 +107 +108 +109 +110 +111 +112 +113 +114 +115 +116 +117 +118 +119 +120 +121 +122 +123 +124 +125 +126 +127 +128 +129 +130 +131 +132 +133 +134 +135 +136 +137 +138 +139 +140 +141 +142 +143 +144 +145 +146 +147 +148 +149 +150 +151 +152 +153 +154 +155 +156 +157 +158 +159 +160 +161 +162 +163 +164 +165 +166 +167 +168 +169 +170 +171 +172 +173 +174 +175 +176 +177 +178 +179 +180 +181 +182 +183 +184 +185 +186 +187 +188 +189 +190 +191 +192 +193 +194 +195 +196 +197 +198 +199 +200 +201 +202 +203 +204 +205 +206 +207 +208 +209 +210 +211 +212 +213 +214 +215 +216 +217 +218 +219 +220 +221 +222 +223 +224 +225 +226 +227 +228 +229 +230 +231 +232 +233 +234 +235 +236 +237 +238 +239 +240 +241 +242 +243 +244 +245 +246 +247 +248 +249 +250 +251 +252 +253 +254 +255 +256 +257 +258 +259 +260 +261 +262 +263 +264 +265 +266 +267 +268 +269 +270 +271 +272 +273 +274 +275 +276 +277 +278 +279 +280 +281 +282 +283 +284 +285 +286 +287 +288 +289 +290 +291 +292 +293 +294 +295 +296 +297 +298 +299 +300 +301 +302 +303 +304 +305 +306 +307 +308 +309 http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/ql/src/test/results/clientpositive/llap/subquery_multi.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/subquery_multi.q.out b/ql/src/test/results/clientpositive/llap/subquery_multi.q.out index 975fd13..45f698b 100644 --- a/ql/src/test/results/clientpositive/llap/subquery_multi.q.out +++ b/ql/src/test/results/clientpositive/llap/subquery_multi.q.out @@ -1663,13 +1663,12 @@ STAGE PLANS: Tez #### A masked pattern was here #### Edges: - Reducer 10 <- Reducer 9 (SIMPLE_EDGE) Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE) Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE) - Reducer 4 <- Reducer 10 (ONE_TO_ONE_EDGE), Reducer 3 (SIMPLE_EDGE) - Reducer 7 <- Map 11 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE) + Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE) + Reducer 7 <- Map 10 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE) Reducer 8 <- Reducer 7 (SIMPLE_EDGE) - Reducer 9 <- Map 11 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE) + Reducer 9 <- Reducer 7 (SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 @@ -1692,7 +1691,7 @@ STAGE PLANS: value expressions: _col0 (type: int), _col2 (type: string), _col3 (type: string), _col5 (type: int), _col6 (type: string), _col7 (type: double), _col8 (type: string) Execution mode: llap LLAP IO: no inputs - Map 11 + Map 10 Map Operator Tree: TableScan alias: pp @@ -1714,11 +1713,6 @@ STAGE PLANS: sort order: ++ Map-reduce partition columns: _col0 (type: string), _col1 (type: string) Statistics: Num rows: 13 Data size: 2548 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col0 (type: string), _col1 (type: string) - sort order: ++ - Map-reduce partition columns: _col0 (type: string), _col1 (type: string) - Statistics: Num rows: 13 Data size: 2548 Basic stats: COMPLETE Column stats: COMPLETE Execution mode: llap LLAP IO: no inputs Map 5 @@ -1763,35 +1757,8 @@ STAGE PLANS: Map-reduce partition columns: _col1 (type: string), _col0 (type: string) Statistics: Num rows: 26 Data size: 7488 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col2 (type: string) - Reduce Output Operator - key expressions: _col1 (type: string), _col0 (type: string) - sort order: ++ - Map-reduce partition columns: _col1 (type: string), _col0 (type: string) - Statistics: Num rows: 26 Data size: 7488 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col2 (type: string) Execution mode: llap LLAP IO: no inputs - Reducer 10 - Execution mode: llap - Reduce Operator Tree: - Group By Operator - keys: KEY._col0 (type: string), KEY._col1 (type: string) - mode: mergepartial - outputColumnNames: _col0, _col1 - Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE - Filter Operator - predicate: _col0 is not null (type: boolean) - Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE - Select Operator - expressions: _col0 (type: string), _col1 (type: string), true (type: boolean) - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col0 (type: string), _col1 (type: string) - sort order: ++ - Map-reduce partition columns: _col0 (type: string), _col1 (type: string) - Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col2 (type: boolean) Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -1874,6 +1841,16 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: string) Statistics: Num rows: 7 Data size: 840 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: bigint), _col2 (type: bigint) + Group By Operator + keys: _col2 (type: string), _col1 (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE Reducer 8 Execution mode: llap Reduce Operator Tree: @@ -1892,24 +1869,24 @@ STAGE PLANS: Reducer 9 Execution mode: llap Reduce Operator Tree: - Merge Join Operator - condition map: - Left Semi Join 0 to 1 - keys: - 0 _col1 (type: string), _col0 (type: string) - 1 _col0 (type: string), _col1 (type: string) - outputColumnNames: _col1, _col2 - Statistics: Num rows: 14 Data size: 2744 Basic stats: COMPLETE Column stats: COMPLETE - Group By Operator - keys: _col2 (type: string), _col1 (type: string) - mode: hash - outputColumnNames: _col0, _col1 + Group By Operator + keys: KEY._col0 (type: string), KEY._col1 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: _col0 is not null (type: boolean) Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col0 (type: string), _col1 (type: string) - sort order: ++ - Map-reduce partition columns: _col0 (type: string), _col1 (type: string) - Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: string), _col1 (type: string), true (type: boolean) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col2 (type: boolean) Stage: Stage-0 Fetch Operator @@ -2143,13 +2120,12 @@ STAGE PLANS: Tez #### A masked pattern was here #### Edges: - Reducer 10 <- Reducer 9 (SIMPLE_EDGE) Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE) Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE) - Reducer 4 <- Reducer 10 (ONE_TO_ONE_EDGE), Reducer 3 (SIMPLE_EDGE) - Reducer 7 <- Map 11 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE) + Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE) + Reducer 7 <- Map 10 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE) Reducer 8 <- Reducer 7 (SIMPLE_EDGE) - Reducer 9 <- Map 11 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE) + Reducer 9 <- Reducer 7 (SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 @@ -2172,7 +2148,7 @@ STAGE PLANS: value expressions: _col0 (type: int), _col2 (type: string), _col3 (type: string), _col5 (type: int), _col7 (type: double), _col8 (type: string) Execution mode: llap LLAP IO: no inputs - Map 11 + Map 10 Map Operator Tree: TableScan alias: pp @@ -2194,11 +2170,6 @@ STAGE PLANS: sort order: ++ Map-reduce partition columns: _col0 (type: string), _col1 (type: string) Statistics: Num rows: 13 Data size: 2548 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col0 (type: string), _col1 (type: string) - sort order: ++ - Map-reduce partition columns: _col0 (type: string), _col1 (type: string) - Statistics: Num rows: 13 Data size: 2548 Basic stats: COMPLETE Column stats: COMPLETE Execution mode: llap LLAP IO: no inputs Map 5 @@ -2243,35 +2214,8 @@ STAGE PLANS: Map-reduce partition columns: _col1 (type: string), _col0 (type: string) Statistics: Num rows: 26 Data size: 7488 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col2 (type: string) - Reduce Output Operator - key expressions: _col1 (type: string), _col0 (type: string) - sort order: ++ - Map-reduce partition columns: _col1 (type: string), _col0 (type: string) - Statistics: Num rows: 26 Data size: 7488 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col2 (type: string) Execution mode: llap LLAP IO: no inputs - Reducer 10 - Execution mode: llap - Reduce Operator Tree: - Group By Operator - keys: KEY._col0 (type: string), KEY._col1 (type: string) - mode: mergepartial - outputColumnNames: _col0, _col1 - Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE - Filter Operator - predicate: _col0 is not null (type: boolean) - Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE - Select Operator - expressions: _col0 (type: string), _col1 (type: string), true (type: boolean) - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col0 (type: string), _col1 (type: string) - sort order: ++ - Map-reduce partition columns: _col0 (type: string), _col1 (type: string) - Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col2 (type: boolean) Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -2354,6 +2298,16 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: string) Statistics: Num rows: 7 Data size: 840 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: bigint), _col2 (type: bigint) + Group By Operator + keys: _col2 (type: string), _col1 (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE Reducer 8 Execution mode: llap Reduce Operator Tree: @@ -2372,24 +2326,24 @@ STAGE PLANS: Reducer 9 Execution mode: llap Reduce Operator Tree: - Merge Join Operator - condition map: - Left Semi Join 0 to 1 - keys: - 0 _col1 (type: string), _col0 (type: string) - 1 _col0 (type: string), _col1 (type: string) - outputColumnNames: _col1, _col2 - Statistics: Num rows: 14 Data size: 2744 Basic stats: COMPLETE Column stats: COMPLETE - Group By Operator - keys: _col2 (type: string), _col1 (type: string) - mode: hash - outputColumnNames: _col0, _col1 + Group By Operator + keys: KEY._col0 (type: string), KEY._col1 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: _col0 is not null (type: boolean) Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col0 (type: string), _col1 (type: string) - sort order: ++ - Map-reduce partition columns: _col0 (type: string), _col1 (type: string) - Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: string), _col1 (type: string), true (type: boolean) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col2 (type: boolean) Stage: Stage-0 Fetch Operator @@ -2855,12 +2809,11 @@ STAGE PLANS: #### A masked pattern was here #### Edges: Reducer 10 <- Map 9 (CUSTOM_SIMPLE_EDGE) - Reducer 11 <- Map 9 (CUSTOM_SIMPLE_EDGE) Reducer 2 <- Map 1 (SIMPLE_EDGE) Reducer 3 <- Map 5 (SIMPLE_EDGE), Reducer 2 (ONE_TO_ONE_EDGE) Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE) Reducer 7 <- Map 6 (XPROD_EDGE), Reducer 10 (XPROD_EDGE) - Reducer 8 <- Reducer 11 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE) + Reducer 8 <- Reducer 10 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 @@ -2939,10 +2892,6 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col0 (type: struct<count:bigint,sum:double,input:double>) - Reduce Output Operator - sort order: - Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col0 (type: struct<count:bigint,sum:double,input:double>) Execution mode: llap LLAP IO: no inputs Reducer 10 @@ -2962,14 +2911,6 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col0 (type: bigint), _col1 (type: bigint) - Reducer 11 - Execution mode: llap - Reduce Operator Tree: - Group By Operator - aggregations: avg(VALUE._col0) - mode: mergepartial - outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: _col0 (type: double), true (type: boolean) outputColumnNames: _col0, _col1