HIVE-14731: Use Tez cartesian product edge in Hive (unpartitioned case only) (Zhiyuan Yang via Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cfbe6125 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cfbe6125 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cfbe6125 Branch: refs/heads/master Commit: cfbe6125725223657dff1e2c9bc3131a5193ae51 Parents: a284df1 Author: Gunther Hagleitner <gunt...@apache.org> Authored: Tue Oct 24 13:06:09 2017 -0700 Committer: Gunther Hagleitner <gunt...@apache.org> Committed: Tue Oct 24 13:06:09 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hive/common/jsonexplain/Vertex.java | 2 +- .../common/jsonexplain/tez/TezJsonParser.java | 2 + .../org/apache/hadoop/hive/conf/HiveConf.java | 2 + data/conf/llap/hive-site.xml | 5 + data/conf/tez/hive-site.xml | 5 + .../test/resources/testconfiguration.properties | 6 + .../hadoop/hive/ql/exec/tez/DagUtils.java | 69 +- .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 5 +- .../hive/ql/optimizer/ConvertJoinMapJoin.java | 74 +- .../optimizer/physical/CrossProductCheck.java | 368 --- .../optimizer/physical/CrossProductHandler.java | 382 +++ .../optimizer/physical/PhysicalOptimizer.java | 2 +- .../physical/SparkCrossProductCheck.java | 12 +- .../hadoop/hive/ql/parse/TezCompiler.java | 4 +- .../hadoop/hive/ql/plan/TezEdgeProperty.java | 4 +- .../hadoop/hive/ql/exec/tez/TestTezTask.java | 4 +- .../test/queries/clientpositive/cross_prod_1.q | 34 + .../test/queries/clientpositive/cross_prod_3.q | 13 + .../test/queries/clientpositive/cross_prod_4.q | 10 + .../dynamic_partition_pruning_2.q | 2 +- .../clientpositive/hybridgrace_hashjoin_1.q | 1 + .../queries/clientpositive/subquery_multi.q | 4 +- .../queries/clientpositive/subquery_notin.q | 4 +- .../queries/clientpositive/subquery_select.q | 4 +- .../clientpositive/llap/auto_join0.q.out | 56 +- .../clientpositive/llap/auto_join_filters.q.out | 4 +- .../clientpositive/llap/auto_join_nulls.q.out | 2 +- .../llap/auto_sortmerge_join_12.q.out | 64 +- .../clientpositive/llap/cross_join.q.out | 94 +- .../clientpositive/llap/cross_prod_1.q.out | 2502 ++++++++++++++++++ .../clientpositive/llap/cross_prod_3.q.out | 133 + .../clientpositive/llap/cross_prod_4.q.out | 195 ++ .../llap/cross_product_check_1.q.out | 12 +- .../llap/cross_product_check_2.q.out | 305 ++- .../results/clientpositive/llap/cte_5.q.out | 10 +- .../results/clientpositive/llap/cte_mat_1.q.out | 10 +- .../results/clientpositive/llap/cte_mat_2.q.out | 10 +- .../llap/dynamic_partition_pruning.q.out | 81 +- .../llap/dynamic_partition_pruning_2.q.out | 52 +- .../llap/dynamic_semijoin_reduction_sw.q.out | 2 +- .../clientpositive/llap/explainuser_1.q.out | 30 +- .../llap/hybridgrace_hashjoin_1.q.out | 166 +- .../clientpositive/llap/jdbc_handler.q.out | 2 +- .../results/clientpositive/llap/join0.q.out | 2 +- .../clientpositive/llap/leftsemijoin.q.out | 2 +- .../results/clientpositive/llap/mapjoin2.q.out | 2 +- .../clientpositive/llap/mapjoin_hint.q.out | 64 +- .../clientpositive/llap/subquery_exists.q.out | 6 +- .../clientpositive/llap/subquery_in.q.out | 2 +- .../clientpositive/llap/subquery_multi.q.out | 106 +- .../clientpositive/llap/subquery_notin.q.out | 107 +- .../clientpositive/llap/subquery_null_agg.q.out | 2 +- .../clientpositive/llap/subquery_scalar.q.out | 48 +- .../clientpositive/llap/subquery_select.q.out | 103 +- .../clientpositive/llap/tez_self_join.q.out | 2 +- .../llap/vector_between_columns.q.out | 155 +- .../llap/vector_complex_all.q.out | 92 +- .../llap/vector_groupby_mapjoin.q.out | 113 +- .../llap/vector_include_no_sel.q.out | 99 +- .../llap/vector_join_filters.q.out | 2 +- .../clientpositive/llap/vector_join_nulls.q.out | 2 +- .../vectorized_dynamic_partition_pruning.q.out | 97 +- .../llap/vectorized_multi_output_select.q.out | 58 +- .../clientpositive/spark/subquery_multi.q.out | 80 +- .../clientpositive/spark/subquery_notin.q.out | 106 +- .../clientpositive/spark/subquery_select.q.out | 84 +- .../tez/hybridgrace_hashjoin_1.q.out | 164 +- 67 files changed, 4670 insertions(+), 1576 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java index b7dc88c..a73893f 100644 --- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java +++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java @@ -74,7 +74,7 @@ public final class Vertex implements Comparable<Vertex>{ public VertexType vertexType; public static enum EdgeType { - BROADCAST, SHUFFLE, MULTICAST, PARTITION_ONLY_SHUFFLE, FORWARD, UNKNOWN + BROADCAST, SHUFFLE, MULTICAST, PARTITION_ONLY_SHUFFLE, FORWARD, XPROD_EDGE, UNKNOWN }; public String edgeType; http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java index 69e5358..b6cca10 100644 --- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java +++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java @@ -36,6 +36,8 @@ public class TezJsonParser extends DagJsonParser { return "MULTICAST"; case "ONE_TO_ONE_EDGE": return "FORWARD"; + case "XPROD_EDGE": + return "XPROD_EDGE"; default: return "UNKNOWN"; } http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 62dcbd5..875e781 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3028,6 +3028,8 @@ public class HiveConf extends Configuration { 0.5f, "The maximum fraction of JVM memory which Tez will reserve for the processor"), TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION("hive.tez.task.scale.memory.reserve.fraction", -1f, "The customized fraction of JVM memory which Tez will reserve for the processor"), + TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED("hive.tez.cartesian-product.enabled", + false, "Use Tez cartesian product edge to speed up cross product"), // The default is different on the client and server, so it's null here. LLAP_IO_ENABLED("hive.llap.io.enabled", null, "Whether the LLAP IO layer is enabled."), LLAP_IO_TRACE_SIZE("hive.llap.io.trace.size", "2Mb", http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/data/conf/llap/hive-site.xml ---------------------------------------------------------------------- diff --git a/data/conf/llap/hive-site.xml b/data/conf/llap/hive-site.xml index 870b584..8cd5144 100644 --- a/data/conf/llap/hive-site.xml +++ b/data/conf/llap/hive-site.xml @@ -338,4 +338,9 @@ <value>true</value> </property> +<property> + <name>hive.tez.cartesian-product.enabled</name> + <value>true</value> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/data/conf/tez/hive-site.xml ---------------------------------------------------------------------- diff --git a/data/conf/tez/hive-site.xml b/data/conf/tez/hive-site.xml index 35e8c99..f1dabf5 100644 --- a/data/conf/tez/hive-site.xml +++ b/data/conf/tez/hive-site.xml @@ -283,4 +283,9 @@ <value>true</value> </property> +<property> + <name>hive.tez.cartesian-product.enabled</name> + <value>true</value> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index a081638..c338826 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -139,6 +139,9 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\ count_dist_rewrite.q,\ create_merge_compressed.q,\ cross_join.q,\ + cross_prod_1.q,\ + cross_prod_3.q,\ + cross_prod_4.q,\ cross_product_check_1.q,\ cross_product_check_2.q,\ ctas.q,\ @@ -508,6 +511,9 @@ minillaplocal.query.files=\ correlationoptimizer4.q,\ correlationoptimizer6.q,\ disable_merge_for_bucketing.q,\ + cross_prod_1.q,\ + cross_prod_3.q,\ + cross_prod_4.q,\ dynamic_partition_pruning.q,\ dynamic_semijoin_reduction.q,\ dynamic_semijoin_reduction_2.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index aae3480..5c338b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -46,6 +46,9 @@ import org.apache.commons.lang.StringUtils; import org.apache.tez.mapreduce.common.MRInputSplitDistributor; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.protos.MRRuntimeProtos; +import org.apache.tez.runtime.library.api.Partitioner; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -135,6 +138,7 @@ import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig; import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig; import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager; /** * DagUtils. DagUtils is a collection of helper methods to convert @@ -264,7 +268,7 @@ public class DagUtils { */ @SuppressWarnings("rawtypes") public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w, - TezEdgeProperty edgeProp, VertexType vertexType) + TezEdgeProperty edgeProp, BaseWork work, TezWork tezWork) throws IOException { Class mergeInputClass; @@ -279,7 +283,8 @@ public class DagUtils { case CUSTOM_EDGE: { mergeInputClass = ConcatenatedMergedKeyValueInput.class; int numBuckets = edgeProp.getNumBuckets(); - CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, vertexType); + CustomVertexConfiguration vertexConf + = new CustomVertexConfiguration(numBuckets, tezWork.getVertexType(work)); DataOutputBuffer dob = new DataOutputBuffer(); vertexConf.write(dob); VertexManagerPluginDescriptor desc = @@ -299,6 +304,10 @@ public class DagUtils { mergeInputClass = ConcatenatedMergedKeyValueInput.class; break; + case XPROD_EDGE: + mergeInputClass = ConcatenatedMergedKeyValueInput.class; + break; + case SIMPLE_EDGE: setupAutoReducerParallelism(edgeProp, w); // fall through @@ -308,7 +317,7 @@ public class DagUtils { break; } - return GroupInputEdge.create(group, w, createEdgeProperty(edgeProp, vConf), + return GroupInputEdge.create(group, w, createEdgeProperty(w, edgeProp, vConf, work, tezWork), InputDescriptor.create(mergeInputClass.getName())); } @@ -322,13 +331,14 @@ public class DagUtils { * @return */ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgeProp, - VertexType vertexType) + BaseWork work, TezWork tezWork) throws IOException { switch(edgeProp.getEdgeType()) { case CUSTOM_EDGE: { int numBuckets = edgeProp.getNumBuckets(); - CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, vertexType); + CustomVertexConfiguration vertexConf = + new CustomVertexConfiguration(numBuckets, tezWork.getVertexType(work)); DataOutputBuffer dob = new DataOutputBuffer(); vertexConf.write(dob); VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create( @@ -339,6 +349,9 @@ public class DagUtils { w.setVertexManagerPlugin(desc); break; } + case XPROD_EDGE: + break; + case SIMPLE_EDGE: { setupAutoReducerParallelism(edgeProp, w); break; @@ -352,14 +365,15 @@ public class DagUtils { // nothing } - return Edge.create(v, w, createEdgeProperty(edgeProp, vConf)); + return Edge.create(v, w, createEdgeProperty(w, edgeProp, vConf, work, tezWork)); } /* * Helper function to create an edge property from an edge type. */ - private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf) - throws IOException { + private EdgeProperty createEdgeProperty(Vertex w, TezEdgeProperty edgeProp, + Configuration conf, BaseWork work, TezWork tezWork) + throws IOException { MRHelpers.translateMRConfToTez(conf); String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS); String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS); @@ -412,7 +426,23 @@ public class DagUtils { .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) .build(); return et4Conf.createDefaultOneToOneEdgeProperty(); + case XPROD_EDGE: + EdgeManagerPluginDescriptor edgeManagerDescriptor = + EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName()); + List<String> crossProductSources = new ArrayList<>(); + for (BaseWork parentWork : tezWork.getParents(work)) { + if (EdgeType.XPROD_EDGE == tezWork.getEdgeType(parentWork, work)) { + crossProductSources.add(parentWork.getName()); + } + } + CartesianProductConfig cpConfig = new CartesianProductConfig(crossProductSources); + edgeManagerDescriptor.setUserPayload(cpConfig.toUserPayload(new TezConfiguration(conf))); + UnorderedPartitionedKVEdgeConfig cpEdgeConf = + UnorderedPartitionedKVEdgeConfig.newBuilder(keyClass, valClass, + ValueHashPartitioner.class.getName()).build(); + return cpEdgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor); case SIMPLE_EDGE: + // fallthrough default: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); @@ -427,6 +457,14 @@ public class DagUtils { } } + public static class ValueHashPartitioner implements Partitioner { + + @Override + public int getPartition(Object key, Object value, int numPartitions) { + return (value.hashCode() & 2147483647) % numPartitions; + } + } + /** * Utility method to create a stripped down configuration for the MR partitioner. * @@ -1240,6 +1278,21 @@ public class DagUtils { } else if (work instanceof MergeJoinWork) { v = createVertex(conf, (MergeJoinWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx, vertexType); + // set VertexManagerPlugin if whether it's a cross product destination vertex + List<String> crossProductSources = new ArrayList<>(); + for (BaseWork parentWork : tezWork.getParents(work)) { + if (tezWork.getEdgeType(parentWork, work) == EdgeType.XPROD_EDGE) { + crossProductSources.add(parentWork.getName()); + } + } + + if (!crossProductSources.isEmpty()) { + CartesianProductConfig cpConfig = new CartesianProductConfig(crossProductSources); + v.setVertexManagerPlugin( + VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName()) + .setUserPayload(cpConfig.toUserPayload(new TezConfiguration(conf)))); + // parallelism shouldn't be set for cartesian product vertex + } } else { // something is seriously wrong if this is happening throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg()); http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index c3a2a2b..a1b7cfb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -477,7 +477,7 @@ public class TezTask extends Task<TezWork> { for (BaseWork v: children) { // finally we can create the grouped edge GroupInputEdge e = utils.createEdge(group, parentConf, - workToVertex.get(v), work.getEdgeProperty(w, v), work.getVertexType(v)); + workToVertex.get(v), work.getEdgeProperty(w, v), v, work); dag.addEdge(e); } @@ -506,8 +506,7 @@ public class TezTask extends Task<TezWork> { Edge e = null; TezEdgeProperty edgeProp = work.getEdgeProperty(w, v); - - e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, work.getVertexType(v)); + e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, v, work); dag.addEdge(e); } } http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 53d34bb..9175597 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -102,6 +102,14 @@ public class ConvertJoinMapJoin implements NodeProcessor { MemoryMonitorInfo memoryMonitorInfo = getMemoryMonitorInfo(maxSize, context.conf); joinOp.getConf().setMemoryMonitorInfo(memoryMonitorInfo); + // not use map join in case of cross product + boolean cartesianProductEdgeEnabled = + HiveConf.getBoolVar(context.conf, HiveConf.ConfVars.TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED); + if (cartesianProductEdgeEnabled && !hasOuterJoin(joinOp) && isCrossProduct(joinOp)) { + fallbackToMergeJoin(joinOp, context); + return null; + } + TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); boolean hiveConvertJoin = context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) & !context.parseContext.getDisableMapJoin(); @@ -614,6 +622,42 @@ public class ConvertJoinMapJoin implements NodeProcessor { return false; } + private boolean hasOuterJoin(JoinOperator joinOp) throws SemanticException { + boolean hasOuter = false; + for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) { + switch (joinCondDesc.getType()) { + case JoinDesc.INNER_JOIN: + case JoinDesc.LEFT_SEMI_JOIN: + case JoinDesc.UNIQUE_JOIN: + hasOuter = false; + break; + + case JoinDesc.FULL_OUTER_JOIN: + case JoinDesc.LEFT_OUTER_JOIN: + case JoinDesc.RIGHT_OUTER_JOIN: + hasOuter = true; + break; + + default: + throw new SemanticException("Unknown join type " + joinCondDesc.getType()); + } + } + return hasOuter; + } + + private boolean isCrossProduct(JoinOperator joinOp) { + ExprNodeDesc[][] joinExprs = joinOp.getConf().getJoinKeys(); + if (joinExprs != null) { + for (ExprNodeDesc[] expr : joinExprs) { + if (expr != null && expr.length != 0) { + return false; + } + } + } + + return true; + } + /** * Obtain big table position for join. * @@ -639,26 +683,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { * case this for now. */ if (joinOp.getConf().getConds().length > 1) { - boolean hasOuter = false; - for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) { - switch (joinCondDesc.getType()) { - case JoinDesc.INNER_JOIN: - case JoinDesc.LEFT_SEMI_JOIN: - case JoinDesc.UNIQUE_JOIN: - hasOuter = false; - break; - - case JoinDesc.FULL_OUTER_JOIN: - case JoinDesc.LEFT_OUTER_JOIN: - case JoinDesc.RIGHT_OUTER_JOIN: - hasOuter = true; - break; - - default: - throw new SemanticException("Unknown join type " + joinCondDesc.getType()); - } - } - if (hasOuter) { + if (hasOuterJoin(joinOp)) { return -1; } } @@ -1100,14 +1125,19 @@ public class ConvertJoinMapJoin implements NodeProcessor { } } + // we are just converting to a common merge join operator. The shuffle + // join in map-reduce case. + fallbackToMergeJoin(joinOp, context); + } + + private void fallbackToMergeJoin(JoinOperator joinOp, OptimizeTezProcContext context) + throws SemanticException { int pos = getMapJoinConversionPos(joinOp, context, estimateNumBuckets(joinOp, false), true, Long.MAX_VALUE, false); if (pos < 0) { LOG.info("Could not get a valid join position. Defaulting to position 0"); pos = 0; } - // we are just converting to a common merge join operator. The shuffle - // join in map-reduce case. LOG.info("Fallback to common merge join operator"); convertJoinSMBJoin(joinOp, context, pos, 0, false); } http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java deleted file mode 100644 index 4b35bb6..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java +++ /dev/null @@ -1,368 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.optimizer.physical; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Stack; -import java.util.TreeMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; -import org.apache.hadoop.hive.ql.exec.ConditionalTask; -import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; -import org.apache.hadoop.hive.ql.exec.JoinOperator; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; -import org.apache.hadoop.hive.ql.exec.tez.TezTask; -import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; -import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; -import org.apache.hadoop.hive.ql.lib.Dispatcher; -import org.apache.hadoop.hive.ql.lib.GraphWalker; -import org.apache.hadoop.hive.ql.lib.Node; -import org.apache.hadoop.hive.ql.lib.NodeProcessor; -import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.lib.Rule; -import org.apache.hadoop.hive.ql.lib.RuleRegExp; -import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; -import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; -import org.apache.hadoop.hive.ql.plan.MapJoinDesc; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.MergeJoinWork; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.ReduceWork; -import org.apache.hadoop.hive.ql.plan.TableScanDesc; -import org.apache.hadoop.hive.ql.plan.TezWork; -import org.apache.hadoop.hive.ql.session.SessionState; - -/* - * Check each MapJoin and ShuffleJoin Operator to see they are performing a cross product. - * If yes, output a warning to the Session's console. - * The Checks made are the following: - * 1. MR, Shuffle Join: - * Check the parent ReduceSinkOp of the JoinOp. If its keys list is size = 0, then - * this is a cross product. - * The parent ReduceSinkOp is in the MapWork for the same Stage. - * 2. MR, MapJoin: - * If the keys expr list on the mapJoin Desc is an empty list for any input, - * this implies a cross product. - * 3. Tez, Shuffle Join: - * Check the parent ReduceSinkOp of the JoinOp. If its keys list is size = 0, then - * this is a cross product. - * The parent ReduceSinkOp checked is based on the ReduceWork.tagToInput map on the - * reduceWork that contains the JoinOp. - * 4. Tez, Map Join: - * If the keys expr list on the mapJoin Desc is an empty list for any input, - * this implies a cross product. - */ -public class CrossProductCheck implements PhysicalPlanResolver, Dispatcher { - - protected static transient final Logger LOG = LoggerFactory - .getLogger(CrossProductCheck.class); - - @Override - public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { - TaskGraphWalker ogw = new TaskGraphWalker(this); - - ArrayList<Node> topNodes = new ArrayList<Node>(); - topNodes.addAll(pctx.getRootTasks()); - - ogw.startWalking(topNodes, null); - return pctx; - } - - @Override - public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs) - throws SemanticException { - @SuppressWarnings("unchecked") - Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd; - if (currTask instanceof MapRedTask) { - MapRedTask mrTsk = (MapRedTask)currTask; - MapredWork mrWrk = mrTsk.getWork(); - checkMapJoins(mrTsk); - checkMRReducer(currTask.toString(), mrWrk); - } else if (currTask instanceof ConditionalTask ) { - List<Task<? extends Serializable>> taskListInConditionalTask = - ((ConditionalTask) currTask).getListTasks(); - for(Task<? extends Serializable> tsk: taskListInConditionalTask){ - dispatch(tsk, stack, nodeOutputs); - } - - } else if (currTask instanceof TezTask) { - TezTask tzTask = (TezTask) currTask; - TezWork tzWrk = tzTask.getWork(); - checkMapJoins(tzWrk); - checkTezReducer(tzWrk); - } - return null; - } - - private void warn(String msg) { - SessionState.getConsole().printInfo("Warning: " + msg, false); - } - - private void checkMapJoins(MapRedTask mrTsk) throws SemanticException { - MapredWork mrWrk = mrTsk.getWork(); - MapWork mapWork = mrWrk.getMapWork(); - List<String> warnings = new MapJoinCheck(mrTsk.toString()).analyze(mapWork); - if (!warnings.isEmpty()) { - for (String w : warnings) { - warn(w); - } - } - ReduceWork redWork = mrWrk.getReduceWork(); - if (redWork != null) { - warnings = new MapJoinCheck(mrTsk.toString()).analyze(redWork); - if (!warnings.isEmpty()) { - for (String w : warnings) { - warn(w); - } - } - } - } - - private void checkMapJoins(TezWork tzWrk) throws SemanticException { - for(BaseWork wrk : tzWrk.getAllWork() ) { - - if ( wrk instanceof MergeJoinWork ) { - wrk = ((MergeJoinWork)wrk).getMainWork(); - } - - List<String> warnings = new MapJoinCheck(wrk.getName()).analyze(wrk); - if ( !warnings.isEmpty() ) { - for(String w : warnings) { - warn(w); - } - } - } - } - - private void checkTezReducer(TezWork tzWrk) throws SemanticException { - for(BaseWork wrk : tzWrk.getAllWork() ) { - - if ( wrk instanceof MergeJoinWork ) { - wrk = ((MergeJoinWork)wrk).getMainWork(); - } - - if ( !(wrk instanceof ReduceWork ) ) { - continue; - } - ReduceWork rWork = (ReduceWork) wrk; - Operator<? extends OperatorDesc> reducer = ((ReduceWork)wrk).getReducer(); - if ( reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator ) { - Map<Integer, ExtractReduceSinkInfo.Info> rsInfo = new TreeMap<Integer, ExtractReduceSinkInfo.Info>(); - for(Map.Entry<Integer, String> e : rWork.getTagToInput().entrySet()) { - rsInfo.putAll(getReducerInfo(tzWrk, rWork.getName(), e.getValue())); - } - checkForCrossProduct(rWork.getName(), reducer, rsInfo); - } - } - } - - private void checkMRReducer(String taskName, MapredWork mrWrk) throws SemanticException { - ReduceWork rWrk = mrWrk.getReduceWork(); - if ( rWrk == null) { - return; - } - Operator<? extends OperatorDesc> reducer = rWrk.getReducer(); - if ( reducer instanceof JoinOperator|| reducer instanceof CommonMergeJoinOperator ) { - BaseWork prntWork = mrWrk.getMapWork(); - checkForCrossProduct(taskName, reducer, - new ExtractReduceSinkInfo(null).analyze(prntWork)); - } - } - - private void checkForCrossProduct(String taskName, - Operator<? extends OperatorDesc> reducer, - Map<Integer, ExtractReduceSinkInfo.Info> rsInfo) { - if ( rsInfo.isEmpty() ) { - return; - } - Iterator<ExtractReduceSinkInfo.Info> it = rsInfo.values().iterator(); - ExtractReduceSinkInfo.Info info = it.next(); - if (info.keyCols.size() == 0) { - List<String> iAliases = new ArrayList<String>(); - iAliases.addAll(info.inputAliases); - while (it.hasNext()) { - info = it.next(); - iAliases.addAll(info.inputAliases); - } - String warning = String.format( - "Shuffle Join %s[tables = %s] in Stage '%s' is a cross product", - reducer.toString(), - iAliases, - taskName); - warn(warning); - } - } - - private Map<Integer, ExtractReduceSinkInfo.Info> getReducerInfo(TezWork tzWrk, String vertex, String prntVertex) - throws SemanticException { - BaseWork prntWork = tzWrk.getWorkMap().get(prntVertex); - return new ExtractReduceSinkInfo(vertex).analyze(prntWork); - } - - /* - * Given a Work descriptor and the TaskName for the work - * this is responsible to check each MapJoinOp for cross products. - * The analyze call returns the warnings list. - * <p> - * For MR the taskname is the StageName, for Tez it is the vertex name. - */ - public static class MapJoinCheck implements NodeProcessor, NodeProcessorCtx { - - final List<String> warnings; - final String taskName; - - MapJoinCheck(String taskName) { - this.taskName = taskName; - warnings = new ArrayList<String>(); - } - - List<String> analyze(BaseWork work) throws SemanticException { - Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); - opRules.put(new RuleRegExp("R1", MapJoinOperator.getOperatorName() - + "%"), this); - Dispatcher disp = new DefaultRuleDispatcher(new NoopProcessor(), opRules, this); - GraphWalker ogw = new DefaultGraphWalker(disp); - ArrayList<Node> topNodes = new ArrayList<Node>(); - topNodes.addAll(work.getAllRootOperators()); - ogw.startWalking(topNodes, null); - return warnings; - } - - @Override - public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, - Object... nodeOutputs) throws SemanticException { - @SuppressWarnings("unchecked") - AbstractMapJoinOperator<? extends MapJoinDesc> mjOp = (AbstractMapJoinOperator<? extends MapJoinDesc>) nd; - MapJoinDesc mjDesc = mjOp.getConf(); - - String bigTablAlias = mjDesc.getBigTableAlias(); - if ( bigTablAlias == null ) { - Operator<? extends OperatorDesc> parent = null; - for(Operator<? extends OperatorDesc> op : mjOp.getParentOperators() ) { - if ( op instanceof TableScanOperator ) { - parent = op; - } - } - if ( parent != null) { - TableScanDesc tDesc = ((TableScanOperator)parent).getConf(); - bigTablAlias = tDesc.getAlias(); - } - } - bigTablAlias = bigTablAlias == null ? "?" : bigTablAlias; - - List<ExprNodeDesc> joinExprs = mjDesc.getKeys().values().iterator().next(); - - if ( joinExprs.size() == 0 ) { - warnings.add( - String.format("Map Join %s[bigTable=%s] in task '%s' is a cross product", - mjOp.toString(), bigTablAlias, taskName)); - } - - return null; - } - } - - /* - * for a given Work Descriptor, it extracts information about the ReduceSinkOps - * in the Work. For Tez, you can restrict it to ReduceSinks for a particular output - * vertex. - */ - public static class ExtractReduceSinkInfo implements NodeProcessor, NodeProcessorCtx { - - static class Info { - List<ExprNodeDesc> keyCols; - List<String> inputAliases; - - Info(List<ExprNodeDesc> keyCols, List<String> inputAliases) { - this.keyCols = keyCols; - this.inputAliases = inputAliases == null ? new ArrayList<String>() : inputAliases; - } - - Info(List<ExprNodeDesc> keyCols, String[] inputAliases) { - this.keyCols = keyCols; - this.inputAliases = inputAliases == null ? new ArrayList<String>() : Arrays.asList(inputAliases); - } - } - - final String outputTaskName; - final Map<Integer, Info> reduceSinkInfo; - - ExtractReduceSinkInfo(String parentTaskName) { - this.outputTaskName = parentTaskName; - reduceSinkInfo = new HashMap<Integer, Info>(); - } - - Map<Integer, Info> analyze(BaseWork work) throws SemanticException { - Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); - opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName() - + "%"), this); - Dispatcher disp = new DefaultRuleDispatcher(new NoopProcessor(), opRules, this); - GraphWalker ogw = new DefaultGraphWalker(disp); - ArrayList<Node> topNodes = new ArrayList<Node>(); - topNodes.addAll(work.getAllRootOperators()); - ogw.startWalking(topNodes, null); - return reduceSinkInfo; - } - - @Override - public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, - Object... nodeOutputs) throws SemanticException { - ReduceSinkOperator rsOp = (ReduceSinkOperator) nd; - ReduceSinkDesc rsDesc = rsOp.getConf(); - - if ( outputTaskName != null ) { - String rOutputName = rsDesc.getOutputName(); - if ( rOutputName == null || !outputTaskName.equals(rOutputName)) { - return null; - } - } - - reduceSinkInfo.put(rsDesc.getTag(), - new Info(rsDesc.getKeyCols(), rsOp.getInputAliases())); - - return null; - } - } - - static class NoopProcessor implements NodeProcessor { - @Override - public final Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, - Object... nodeOutputs) throws SemanticException { - return nd; - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java new file mode 100644 index 0000000..1442378 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java @@ -0,0 +1,382 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; +import java.util.TreeMap; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.plan.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.session.SessionState; + +/* + * Check each MapJoin and ShuffleJoin Operator to see they are performing a cross product. + * If yes, output a warning to the Session's console. + * The Checks made are the following: + * 1. MR, Shuffle Join: + * Check the parent ReduceSinkOp of the JoinOp. If its keys list is size = 0, then + * this is a cross product. + * The parent ReduceSinkOp is in the MapWork for the same Stage. + * 2. MR, MapJoin: + * If the keys expr list on the mapJoin Desc is an empty list for any input, + * this implies a cross product. + * 3. Tez, Shuffle Join: + * Check the parent ReduceSinkOp of the JoinOp. If its keys list is size = 0, then + * this is a cross product. + * The parent ReduceSinkOp checked is based on the ReduceWork.tagToInput map on the + * reduceWork that contains the JoinOp. + * 4. Tez, Map Join: + * If the keys expr list on the mapJoin Desc is an empty list for any input, + * this implies a cross product. + */ +public class CrossProductHandler implements PhysicalPlanResolver, Dispatcher { + + protected static transient final Logger LOG = LoggerFactory + .getLogger(CrossProductHandler.class); + private Boolean cartesianProductEdgeEnabled = null; + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + cartesianProductEdgeEnabled = + HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED); + TaskGraphWalker ogw = new TaskGraphWalker(this); + + ArrayList<Node> topNodes = new ArrayList<Node>(); + topNodes.addAll(pctx.getRootTasks()); + + ogw.startWalking(topNodes, null); + return pctx; + } + + @Override + public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs) + throws SemanticException { + @SuppressWarnings("unchecked") + Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd; + if (currTask instanceof MapRedTask) { + MapRedTask mrTsk = (MapRedTask)currTask; + MapredWork mrWrk = mrTsk.getWork(); + checkMapJoins(mrTsk); + checkMRReducer(currTask.toString(), mrWrk); + } else if (currTask instanceof ConditionalTask ) { + List<Task<? extends Serializable>> taskListInConditionalTask = + ((ConditionalTask) currTask).getListTasks(); + for(Task<? extends Serializable> tsk: taskListInConditionalTask){ + dispatch(tsk, stack, nodeOutputs); + } + + } else if (currTask instanceof TezTask) { + TezTask tezTask = (TezTask) currTask; + TezWork tezWork = tezTask.getWork(); + checkMapJoins(tezWork); + checkTezReducer(tezWork); + } + return null; + } + + private void warn(String msg) { + SessionState.getConsole().printInfo("Warning: " + msg, false); + } + + private void checkMapJoins(MapRedTask mrTsk) throws SemanticException { + MapredWork mrWrk = mrTsk.getWork(); + MapWork mapWork = mrWrk.getMapWork(); + List<String> warnings = new MapJoinCheck(mrTsk.toString()).analyze(mapWork); + if (!warnings.isEmpty()) { + for (String w : warnings) { + warn(w); + } + } + ReduceWork redWork = mrWrk.getReduceWork(); + if (redWork != null) { + warnings = new MapJoinCheck(mrTsk.toString()).analyze(redWork); + if (!warnings.isEmpty()) { + for (String w : warnings) { + warn(w); + } + } + } + } + + private void checkMapJoins(TezWork tezWork) throws SemanticException { + for(BaseWork wrk : tezWork.getAllWork() ) { + + if ( wrk instanceof MergeJoinWork ) { + wrk = ((MergeJoinWork)wrk).getMainWork(); + } + + List<String> warnings = new MapJoinCheck(wrk.getName()).analyze(wrk); + if ( !warnings.isEmpty() ) { + for(String w : warnings) { + warn(w); + } + } + } + } + + private void checkTezReducer(TezWork tezWork) throws SemanticException { + for(BaseWork wrk : tezWork.getAllWork() ) { + BaseWork origWrk = null; + + if ( wrk instanceof MergeJoinWork ) { + origWrk = wrk; + wrk = ((MergeJoinWork)wrk).getMainWork(); + } + + if ( !(wrk instanceof ReduceWork ) ) { + continue; + } + ReduceWork rWork = (ReduceWork) wrk; + Operator<? extends OperatorDesc> reducer = ((ReduceWork)wrk).getReducer(); + if ( reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator ) { + boolean noOuterJoin = ((JoinDesc)reducer.getConf()).isNoOuterJoin(); + Map<Integer, ExtractReduceSinkInfo.Info> rsInfo = new TreeMap<Integer, ExtractReduceSinkInfo.Info>(); + for(Map.Entry<Integer, String> e : rWork.getTagToInput().entrySet()) { + rsInfo.putAll(getReducerInfo(tezWork, rWork.getName(), e.getValue())); + } + if (checkForCrossProduct(rWork.getName(), reducer, rsInfo) + && cartesianProductEdgeEnabled && noOuterJoin) { + List<BaseWork> parents = tezWork.getParents(null == origWrk ? wrk : origWrk); + for (BaseWork p: parents) { + TezEdgeProperty prop = tezWork.getEdgeProperty(p, null == origWrk ? wrk : origWrk); + LOG.info("Edge Type: "+prop.getEdgeType()); + if (prop.getEdgeType().equals(EdgeType.CUSTOM_SIMPLE_EDGE) + || prop.getEdgeType().equals(EdgeType.CUSTOM_EDGE)) { + prop.setEdgeType(EdgeType.XPROD_EDGE); + rWork.setNumReduceTasks(-1); + rWork.setMaxReduceTasks(-1); + rWork.setMinReduceTasks(-1); + } + } + } + } + } + } + + private void checkMRReducer(String taskName, MapredWork mrWrk) throws SemanticException { + ReduceWork rWrk = mrWrk.getReduceWork(); + if ( rWrk == null) { + return; + } + Operator<? extends OperatorDesc> reducer = rWrk.getReducer(); + if ( reducer instanceof JoinOperator|| reducer instanceof CommonMergeJoinOperator ) { + BaseWork parentWork = mrWrk.getMapWork(); + checkForCrossProduct(taskName, reducer, + new ExtractReduceSinkInfo(null).analyze(parentWork)); + } + } + + private boolean checkForCrossProduct(String taskName, + Operator<? extends OperatorDesc> reducer, + Map<Integer, ExtractReduceSinkInfo.Info> rsInfo) { + if ( rsInfo.isEmpty() ) { + return false; + } + Iterator<ExtractReduceSinkInfo.Info> it = rsInfo.values().iterator(); + ExtractReduceSinkInfo.Info info = it.next(); + if (info.keyCols.size() == 0) { + List<String> iAliases = new ArrayList<String>(); + iAliases.addAll(info.inputAliases); + while (it.hasNext()) { + info = it.next(); + iAliases.addAll(info.inputAliases); + } + String warning = String.format( + "Shuffle Join %s[tables = %s] in Stage '%s' is a cross product", + reducer.toString(), + iAliases, + taskName); + warn(warning); + return true; + } + return false; + } + + private Map<Integer, ExtractReduceSinkInfo.Info> getReducerInfo(TezWork tezWork, String vertex, String prntVertex) + throws SemanticException { + BaseWork parentWork = tezWork.getWorkMap().get(prntVertex); + return new ExtractReduceSinkInfo(vertex).analyze(parentWork); + } + + /* + * Given a Work descriptor and the TaskName for the work + * this is responsible to check each MapJoinOp for cross products. + * The analyze call returns the warnings list. + * <p> + * For MR the taskname is the StageName, for Tez it is the vertex name. + */ + public static class MapJoinCheck implements NodeProcessor, NodeProcessorCtx { + + final List<String> warnings; + final String taskName; + + MapJoinCheck(String taskName) { + this.taskName = taskName; + warnings = new ArrayList<String>(); + } + + List<String> analyze(BaseWork work) throws SemanticException { + Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); + opRules.put(new RuleRegExp("R1", MapJoinOperator.getOperatorName() + + "%"), this); + Dispatcher disp = new DefaultRuleDispatcher(new NoopProcessor(), opRules, this); + GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList<Node> topNodes = new ArrayList<Node>(); + topNodes.addAll(work.getAllRootOperators()); + ogw.startWalking(topNodes, null); + return warnings; + } + + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + @SuppressWarnings("unchecked") + AbstractMapJoinOperator<? extends MapJoinDesc> mjOp = (AbstractMapJoinOperator<? extends MapJoinDesc>) nd; + MapJoinDesc mjDesc = mjOp.getConf(); + + String bigTablAlias = mjDesc.getBigTableAlias(); + if ( bigTablAlias == null ) { + Operator<? extends OperatorDesc> parent = null; + for(Operator<? extends OperatorDesc> op : mjOp.getParentOperators() ) { + if ( op instanceof TableScanOperator ) { + parent = op; + } + } + if ( parent != null) { + TableScanDesc tDesc = ((TableScanOperator)parent).getConf(); + bigTablAlias = tDesc.getAlias(); + } + } + bigTablAlias = bigTablAlias == null ? "?" : bigTablAlias; + + List<ExprNodeDesc> joinExprs = mjDesc.getKeys().values().iterator().next(); + + if ( joinExprs.size() == 0 ) { + warnings.add( + String.format("Map Join %s[bigTable=%s] in task '%s' is a cross product", + mjOp.toString(), bigTablAlias, taskName)); + } + + return null; + } + } + + /* + * for a given Work Descriptor, it extracts information about the ReduceSinkOps + * in the Work. For Tez, you can restrict it to ReduceSinks for a particular output + * vertex. + */ + public static class ExtractReduceSinkInfo implements NodeProcessor, NodeProcessorCtx { + + static class Info { + List<ExprNodeDesc> keyCols; + List<String> inputAliases; + + Info(List<ExprNodeDesc> keyCols, List<String> inputAliases) { + this.keyCols = keyCols; + this.inputAliases = inputAliases == null ? new ArrayList<String>() : inputAliases; + } + + Info(List<ExprNodeDesc> keyCols, String[] inputAliases) { + this.keyCols = keyCols; + this.inputAliases = inputAliases == null ? new ArrayList<String>() : Arrays.asList(inputAliases); + } + } + + final String outputTaskName; + final Map<Integer, Info> reduceSinkInfo; + + ExtractReduceSinkInfo(String parentTaskName) { + this.outputTaskName = parentTaskName; + reduceSinkInfo = new HashMap<Integer, Info>(); + } + + Map<Integer, Info> analyze(BaseWork work) throws SemanticException { + Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); + opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName() + + "%"), this); + Dispatcher disp = new DefaultRuleDispatcher(new NoopProcessor(), opRules, this); + GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList<Node> topNodes = new ArrayList<Node>(); + topNodes.addAll(work.getAllRootOperators()); + ogw.startWalking(topNodes, null); + return reduceSinkInfo; + } + + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + ReduceSinkOperator rsOp = (ReduceSinkOperator) nd; + ReduceSinkDesc rsDesc = rsOp.getConf(); + + if ( outputTaskName != null ) { + String rOutputName = rsDesc.getOutputName(); + if ( rOutputName == null || !outputTaskName.equals(rOutputName)) { + return null; + } + } + + reduceSinkInfo.put(rsDesc.getTag(), + new Info(rsDesc.getKeyCols(), rsOp.getInputAliases())); + + return null; + } + } + + static class NoopProcessor implements NodeProcessor { + @Override + public final Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + return nd; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index 9377563..c040406 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -82,7 +82,7 @@ public class PhysicalOptimizer { } if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_CHECK_CROSS_PRODUCT)) { - resolvers.add(new CrossProductCheck()); + resolvers.add(new CrossProductHandler()); } // Vectorization should be the last optimization, because it doesn't modify the plan http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java index 7f3b1b3..9f14c66 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java @@ -92,9 +92,9 @@ public class SparkCrossProductCheck implements PhysicalPlanResolver, Dispatcher for (ReduceWork reduceWork : sparkWork.getAllReduceWork()) { Operator<? extends OperatorDesc> reducer = reduceWork.getReducer(); if (reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator) { - Map<Integer, CrossProductCheck.ExtractReduceSinkInfo.Info> rsInfo = new TreeMap<Integer, CrossProductCheck.ExtractReduceSinkInfo.Info>(); + Map<Integer, CrossProductHandler.ExtractReduceSinkInfo.Info> rsInfo = new TreeMap<Integer, CrossProductHandler.ExtractReduceSinkInfo.Info>(); for (BaseWork parent : sparkWork.getParents(reduceWork)) { - rsInfo.putAll(new CrossProductCheck.ExtractReduceSinkInfo(null).analyze(parent)); + rsInfo.putAll(new CrossProductHandler.ExtractReduceSinkInfo(null).analyze(parent)); } checkForCrossProduct(reduceWork.getName(), reducer, rsInfo); } @@ -105,7 +105,7 @@ public class SparkCrossProductCheck implements PhysicalPlanResolver, Dispatcher SparkWork sparkWork = sparkTask.getWork(); for (BaseWork baseWork : sparkWork.getAllWork()) { List<String> warnings = - new CrossProductCheck.MapJoinCheck(sparkTask.toString()).analyze(baseWork); + new CrossProductHandler.MapJoinCheck(sparkTask.toString()).analyze(baseWork); for (String w : warnings) { warn(w); } @@ -114,12 +114,12 @@ public class SparkCrossProductCheck implements PhysicalPlanResolver, Dispatcher private void checkForCrossProduct(String workName, Operator<? extends OperatorDesc> reducer, - Map<Integer, CrossProductCheck.ExtractReduceSinkInfo.Info> rsInfo) { + Map<Integer, CrossProductHandler.ExtractReduceSinkInfo.Info> rsInfo) { if (rsInfo.isEmpty()) { return; } - Iterator<CrossProductCheck.ExtractReduceSinkInfo.Info> it = rsInfo.values().iterator(); - CrossProductCheck.ExtractReduceSinkInfo.Info info = it.next(); + Iterator<CrossProductHandler.ExtractReduceSinkInfo.Info> it = rsInfo.values().iterator(); + CrossProductHandler.ExtractReduceSinkInfo.Info info = it.next(); if (info.keyCols.size() == 0) { List<String> iAliases = new ArrayList<String>(); iAliases.addAll(info.inputAliases); http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 15836ec..da30c3b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -80,7 +80,7 @@ import org.apache.hadoop.hive.ql.optimizer.SharedWorkOptimizer; import org.apache.hadoop.hive.ql.optimizer.correlation.ReduceSinkJoinDeDuplication; import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits; import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer; -import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck; +import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductHandler; import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile; import org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider; import org.apache.hadoop.hive.ql.optimizer.physical.LlapPreVectorizationPass; @@ -658,7 +658,7 @@ public class TezCompiler extends TaskCompiler { } if (conf.getBoolVar(HiveConf.ConfVars.HIVE_CHECK_CROSS_PRODUCT)) { - physicalCtx = new CrossProductCheck().resolve(physicalCtx); + physicalCtx = new CrossProductHandler().resolve(physicalCtx); } else { LOG.debug("Skipping cross product analysis"); } http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index bbed9be..d43b81a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -28,7 +28,8 @@ public class TezEdgeProperty { CONTAINS,//used for union (all?) CUSTOM_EDGE,//CO_PARTITION_EDGE CUSTOM_SIMPLE_EDGE,//PARTITION_EDGE - ONE_TO_ONE_EDGE + ONE_TO_ONE_EDGE, + XPROD_EDGE } private HiveConf hiveConf; @@ -107,4 +108,5 @@ public class TezEdgeProperty { public void setEdgeType(EdgeType type) { this.edgeType = type; } + } http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 2dc334d..47aa936 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -109,8 +109,8 @@ public class TestTezTask { }); when(utils.createEdge(any(JobConf.class), any(Vertex.class), any(Vertex.class), - any(TezEdgeProperty.class), any(VertexType.class))).thenAnswer(new Answer<Edge>() { - + any(TezEdgeProperty.class), any(BaseWork.class), any(TezWork.class))) + .thenAnswer(new Answer<Edge>() { @Override public Edge answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/cross_prod_1.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/cross_prod_1.q b/ql/src/test/queries/clientpositive/cross_prod_1.q new file mode 100644 index 0000000..b5a84ea --- /dev/null +++ b/ql/src/test/queries/clientpositive/cross_prod_1.q @@ -0,0 +1,34 @@ +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +set hive.tez.cartesian-product.enabled=true; + +create table X as +select distinct * from src order by key limit 10; + +explain select * from X as A, X as B order by A.key, B.key; +select * from X as A, X as B order by A.key, B.key; + +explain select * from X as A join X as B on A.key<B.key; +select * from X as A join X as B on A.key<B.key order by A.key, B.key; + +explain select * from X as A join X as B on A.key between "103" and "105"; +select * from X as A join X as B on A.key between "103" and "105" order by A.key, B.key; + +explain select * from X as A, X as B, X as C; +select * from X as A, X as B, X as C order by A.key, B.key, C.key; + +explain select * from X as A join X as B on A.key in ("103", "104", "105"); +select * from X as A join X as B on A.key in ("103", "104", "105") order by A.key, B.key; + +explain select A.key, count(*) from X as A, X as B group by A.key; +select A.key, count(*) from X as A, X as B group by A.key order by A.key; + +explain select * from X as A left outer join X as B on (A.key = B.key or A.value between "val_103" and "val_105"); +explain select * from X as A right outer join X as B on (A.key = B.key or A.value between "val_103" and "val_105"); +explain select * from X as A full outer join X as B on (A.key = B.key or A.value between "val_103" and "val_105"); + +explain select * from (select X.key, count(*) from X group by X.key) as A, (select X.key, count(*) from X group by X.key) as B; +select * from (select X.key, count(*) from X group by X.key) as A, (select X.key, count(*) from X group by X.key) as B order by A.key, B.key; + +explain select * from (select * from X union all select * from X as y) a join X; +select * from (select * from X union all select * from X as y) a join X order by a.key, X.key; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/cross_prod_3.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/cross_prod_3.q b/ql/src/test/queries/clientpositive/cross_prod_3.q new file mode 100644 index 0000000..a233f17 --- /dev/null +++ b/ql/src/test/queries/clientpositive/cross_prod_3.q @@ -0,0 +1,13 @@ +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +set hive.tez.cartesian-product.enabled=true; +set hive.auto.convert.join=true; +set hive.convert.join.bucket.mapjoin.tez=true; + +create table X (key string, value string) clustered by (key) into 2 buckets; +insert overwrite table X select distinct * from src order by key limit 10; + +create table Y as +select * from src order by key limit 1; + +explain select * from Y, (select * from X as A join X as B on A.key=B.key) as C where Y.key=C.key; http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/cross_prod_4.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/cross_prod_4.q b/ql/src/test/queries/clientpositive/cross_prod_4.q new file mode 100644 index 0000000..ea58e98 --- /dev/null +++ b/ql/src/test/queries/clientpositive/cross_prod_4.q @@ -0,0 +1,10 @@ +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +set hive.auto.convert.join=true; +set hive.tez.cartesian-product.enabled=true; + +create table X as +select distinct * from src order by key limit 10; + +explain select * from X as A, X as B; +select * from X as A, X as B order by A.key, B.key; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q b/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q index b1e289f..b8b04ee 100644 --- a/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q +++ b/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q @@ -85,7 +85,7 @@ SELECT agg.amount FROM agg_01 agg, dim_shops d1 WHERE agg.dim_shops_id = d1.id -and agg.dim_shops_id = 1; +and agg.dim_shops_id = 1 order by agg.amount; set hive.tez.dynamic.partition.pruning.max.event.size=1; set hive.tez.dynamic.partition.pruning.max.data.size=1000000; http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_1.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_1.q b/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_1.q index 9c19a86..e404dd0 100644 --- a/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_1.q +++ b/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_1.q @@ -1,5 +1,6 @@ set hive.mapred.mode=nonstrict; set hive.explain.user=false; +set tez.cartesian-product.max-parallelism=1; -- Hybrid Grace Hash Join -- Test basic functionalities: -- 1. Various cases when hash partitions spill http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/subquery_multi.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/subquery_multi.q b/ql/src/test/queries/clientpositive/subquery_multi.q index c546d24..6ef198d 100644 --- a/ql/src/test/queries/clientpositive/subquery_multi.q +++ b/ql/src/test/queries/clientpositive/subquery_multi.q @@ -36,7 +36,7 @@ select * from part_null where p_name IN (select p_name from part_null) AND p_bra -- NOT IN is always true and IN is false for where p_name is NULL, hence should return all but one row explain select * from part_null where p_name IN (select p_name from part_null) AND p_brand NOT IN (select p_type from part_null); -select * from part_null where p_name IN (select p_name from part_null) AND p_brand NOT IN (select p_type from part_null); +select * from part_null where p_name IN (select p_name from part_null) AND p_brand NOT IN (select p_type from part_null) order by part_null.p_partkey; -- NOT IN has one NULL value so this whole query should not return any row explain select * from part_null where p_brand IN (select p_brand from part_null) AND p_brand NOT IN (select p_name from part_null); @@ -49,7 +49,7 @@ select * from part_null where p_name NOT IN (select c from tempty) AND p_brand I -- IN, EXISTS explain select * from part_null where p_name IN (select p_name from part_null) AND EXISTS (select c from tnull); -select * from part_null where p_name IN (select p_name from part_null) AND EXISTS (select c from tnull); +select * from part_null where p_name IN (select p_name from part_null) AND EXISTS (select c from tnull) order by part_null.p_partkey; explain select * from part_null where p_size IN (select p_size from part_null) AND EXISTS (select c from tempty); select * from part_null where p_size IN (select p_size from part_null) AND EXISTS (select c from tempty); http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/subquery_notin.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/subquery_notin.q b/ql/src/test/queries/clientpositive/subquery_notin.q index e23eb2b..c509654 100644 --- a/ql/src/test/queries/clientpositive/subquery_notin.q +++ b/ql/src/test/queries/clientpositive/subquery_notin.q @@ -109,8 +109,8 @@ explain select * from part where p_brand <> 'Brand#14' AND p_size NOT IN (select select * from part where p_brand <> 'Brand#14' AND p_size NOT IN (select (p_size*p_size) from part p where p.p_type = part.p_type ) AND p_size <> 340; --lhs contains non-simple expression -explain select * from part where (p_size-1) NOT IN (select min(p_size) from part group by p_type); -select * from part where (p_size-1) NOT IN (select min(p_size) from part group by p_type); +explain select * from part where (p_size-1) NOT IN (select min(p_size) from part group by p_type) order by p_partkey; +select * from part where (p_size-1) NOT IN (select min(p_size) from part group by p_type) order by p_partkey; explain select * from part where (p_partkey*p_size) NOT IN (select min(p_partkey) from part group by p_type); select * from part where (p_partkey*p_size) NOT IN (select min(p_partkey) from part group by p_type); http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/queries/clientpositive/subquery_select.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/subquery_select.q b/ql/src/test/queries/clientpositive/subquery_select.q index 15377a4..c1766ff 100644 --- a/ql/src/test/queries/clientpositive/subquery_select.q +++ b/ql/src/test/queries/clientpositive/subquery_select.q @@ -155,8 +155,8 @@ SELECT p_size, (SELECT count(p_size) FROM part p WHERE p.p_type = part.p_type) IS NULL from part; -- scalar, non-corr, non agg -explain select p_type, (select p_size from part order by p_size limit 1) = 1 from part; -select p_type, (select p_size from part order by p_size limit 1) = 1 from part; +explain select p_type, (select p_size from part order by p_size limit 1) = 1 from part order by p_type; +select p_type, (select p_size from part order by p_size limit 1) = 1 from part order by p_type; -- in corr, multiple EXPLAIN SELECT p_size, p_size IN ( http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/results/clientpositive/llap/auto_join0.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/auto_join0.q.out b/ql/src/test/results/clientpositive/llap/auto_join0.q.out index 7f0a878..29945ad 100644 --- a/ql/src/test/results/clientpositive/llap/auto_join0.q.out +++ b/ql/src/test/results/clientpositive/llap/auto_join0.q.out @@ -1,4 +1,4 @@ -Warning: Map Join MAPJOIN[22][bigTable=?] in task 'Reducer 2' is a cross product +Warning: Shuffle Join MERGEJOIN[22][tables = [src1, src2]] in Stage 'Reducer 3' is a cross product PREHOOK: query: explain select sum(hash(a.k1,a.v1,a.k2, a.v2)) from ( @@ -30,9 +30,10 @@ STAGE PLANS: Tez #### A masked pattern was here #### Edges: - Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 4 (BROADCAST_EDGE) - Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) - Reducer 4 <- Map 1 (SIMPLE_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (XPROD_EDGE), Reducer 5 (XPROD_EDGE) + Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Map 1 (SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 @@ -64,28 +65,33 @@ STAGE PLANS: expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string) outputColumnNames: _col0, _col1 Statistics: Num rows: 166 Data size: 29548 Basic stats: COMPLETE Column stats: COMPLETE - Map Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 - 1 - outputColumnNames: _col0, _col1, _col2, _col3 - input vertices: - 1 Reducer 4 - Statistics: Num rows: 27556 Data size: 9809936 Basic stats: COMPLETE Column stats: COMPLETE - Group By Operator - aggregations: sum(hash(_col0,_col1,_col2,_col3)) - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - sort order: - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col0 (type: bigint) + Reduce Output Operator + sort order: + Statistics: Num rows: 166 Data size: 29548 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: string), _col1 (type: string) Reducer 3 Execution mode: llap Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 + 1 + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 27556 Data size: 9809936 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: sum(hash(_col0,_col1,_col2,_col3)) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Reducer 4 + Execution mode: llap + Reduce Operator Tree: Group By Operator aggregations: sum(VALUE._col0) mode: mergepartial @@ -98,7 +104,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - Reducer 4 + Reducer 5 Execution mode: llap Reduce Operator Tree: Select Operator @@ -116,7 +122,7 @@ STAGE PLANS: Processor Tree: ListSink -Warning: Map Join MAPJOIN[22][bigTable=?] in task 'Reducer 2' is a cross product +Warning: Shuffle Join MERGEJOIN[22][tables = [src1, src2]] in Stage 'Reducer 3' is a cross product PREHOOK: query: select sum(hash(a.k1,a.v1,a.k2, a.v2)) from ( SELECT src1.key as k1, src1.value as v1, http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/results/clientpositive/llap/auto_join_filters.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/auto_join_filters.q.out b/ql/src/test/results/clientpositive/llap/auto_join_filters.q.out index d1d9408..079f047 100644 --- a/ql/src/test/results/clientpositive/llap/auto_join_filters.q.out +++ b/ql/src/test/results/clientpositive/llap/auto_join_filters.q.out @@ -14,7 +14,7 @@ POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/in3.txt' INTO TABLE my POSTHOOK: type: LOAD #### A masked pattern was here #### POSTHOOK: Output: default@myinput1 -Warning: Map Join MAPJOIN[18][bigTable=?] in task 'Map 1' is a cross product +Warning: Shuffle Join MERGEJOIN[18][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product PREHOOK: query: SELECT sum(hash(a.key,a.value,b.key,b.value)) FROM myinput1 a JOIN myinput1 b on a.key > 40 AND a.value > 50 AND a.key = a.value AND b.key > 40 AND b.value > 50 AND b.key = b.value PREHOOK: type: QUERY PREHOOK: Input: default@myinput1 @@ -300,7 +300,7 @@ POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/in2.txt' into table sm POSTHOOK: type: LOAD #### A masked pattern was here #### POSTHOOK: Output: default@smb_input2 -Warning: Map Join MAPJOIN[18][bigTable=?] in task 'Map 1' is a cross product +Warning: Shuffle Join MERGEJOIN[18][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product PREHOOK: query: SELECT sum(hash(a.key,a.value,b.key,b.value)) FROM myinput1 a JOIN myinput1 b on a.key > 40 AND a.value > 50 AND a.key = a.value AND b.key > 40 AND b.value > 50 AND b.key = b.value PREHOOK: type: QUERY PREHOOK: Input: default@myinput1 http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/results/clientpositive/llap/auto_join_nulls.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/auto_join_nulls.q.out b/ql/src/test/results/clientpositive/llap/auto_join_nulls.q.out index 5984e8f..04da1f2 100644 --- a/ql/src/test/results/clientpositive/llap/auto_join_nulls.q.out +++ b/ql/src/test/results/clientpositive/llap/auto_join_nulls.q.out @@ -14,7 +14,7 @@ POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/in1.txt' INTO TABLE my POSTHOOK: type: LOAD #### A masked pattern was here #### POSTHOOK: Output: default@myinput1 -Warning: Map Join MAPJOIN[14][bigTable=?] in task 'Map 1' is a cross product +Warning: Shuffle Join MERGEJOIN[14][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product PREHOOK: query: SELECT sum(hash(a.key,a.value,b.key,b.value)) FROM myinput1 a JOIN myinput1 b PREHOOK: type: QUERY PREHOOK: Input: default@myinput1 http://git-wip-us.apache.org/repos/asf/hive/blob/cfbe6125/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_12.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_12.q.out b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_12.q.out index 6ef1f34..3acbb20 100644 --- a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_12.q.out +++ b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_12.q.out @@ -134,7 +134,7 @@ POSTHOOK: query: load data local inpath '../../data/files/smallsrcsortbucket3out POSTHOOK: type: LOAD #### A masked pattern was here #### POSTHOOK: Output: default@bucket_medium@ds=2008-04-08 -Warning: Map Join MAPJOIN[34][bigTable=?] in task 'Map 3' is a cross product +Warning: Shuffle Join MERGEJOIN[34][tables = [$hdt$_1, $hdt$_2, $hdt$_0, $hdt$_3]] in Stage 'Reducer 4' is a cross product PREHOOK: query: explain extended select count(*) FROM bucket_small a JOIN bucket_medium b ON a.key = b.key JOIN bucket_big c ON c.key = b.key JOIN bucket_medium d ON c.key = b.key PREHOOK: type: QUERY POSTHOOK: query: explain extended select count(*) FROM bucket_small a JOIN bucket_medium b ON a.key = b.key JOIN bucket_big c ON c.key = b.key JOIN bucket_medium d ON c.key = b.key @@ -148,8 +148,9 @@ STAGE PLANS: Tez #### A masked pattern was here #### Edges: - Map 3 <- Map 1 (BROADCAST_EDGE), Map 2 (BROADCAST_EDGE), Map 5 (BROADCAST_EDGE) - Reducer 4 <- Map 3 (CUSTOM_SIMPLE_EDGE) + Map 3 <- Map 1 (BROADCAST_EDGE), Map 2 (BROADCAST_EDGE) + Reducer 4 <- Map 3 (XPROD_EDGE), Map 6 (XPROD_EDGE) + Reducer 5 <- Reducer 4 (CUSTOM_SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 @@ -336,29 +337,12 @@ STAGE PLANS: 1 Map 2 Position of Big Table: 2 Statistics: Num rows: 244 Data size: 43381 Basic stats: COMPLETE Column stats: NONE - Map Join Operator - condition map: - Inner Join 0 to 1 - Estimated key counts: Map 5 => 1 - keys: - 0 - 1 - input vertices: - 1 Map 5 - Position of Big Table: 0 - Statistics: Num rows: 244 Data size: 45577 Basic stats: COMPLETE Column stats: NONE - Group By Operator - aggregations: count() - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - null sort order: - sort order: - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE - tag: -1 - value expressions: _col0 (type: bigint) - auto parallelism: false + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 244 Data size: 43381 Basic stats: COMPLETE Column stats: NONE + tag: 0 + auto parallelism: false Execution mode: llap LLAP IO: no inputs Path -> Alias: @@ -465,7 +449,7 @@ STAGE PLANS: Truncated Path -> Alias: /bucket_big/ds=2008-04-08 [c] /bucket_big/ds=2008-04-09 [c] - Map 5 + Map 6 Map Operator Tree: TableScan alias: d @@ -539,6 +523,30 @@ STAGE PLANS: Execution mode: llap Needs Tagging: false Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 + 1 + Position of Big Table: 0 + Statistics: Num rows: 244 Data size: 45577 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + tag: -1 + value expressions: _col0 (type: bigint) + auto parallelism: false + Reducer 5 + Execution mode: llap + Needs Tagging: false + Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) mode: mergepartial @@ -573,7 +581,7 @@ STAGE PLANS: Processor Tree: ListSink -Warning: Map Join MAPJOIN[34][bigTable=?] in task 'Map 3' is a cross product +Warning: Shuffle Join MERGEJOIN[34][tables = [$hdt$_1, $hdt$_2, $hdt$_0, $hdt$_3]] in Stage 'Reducer 4' is a cross product PREHOOK: query: select count(*) FROM bucket_small a JOIN bucket_medium b ON a.key = b.key JOIN bucket_big c ON c.key = b.key JOIN bucket_medium d ON c.key = b.key PREHOOK: type: QUERY PREHOOK: Input: default@bucket_big