Author: xuefu Date: Fri Oct 31 02:33:08 2014 New Revision: 1635656 URL: http://svn.apache.org/r1635656 Log: HIVE-8202: Support SMB Join for Hive on Spark [Spark Branch] (Szehon via Xuefu)
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_16.q.out Modified: hive/branches/spark/itests/src/test/resources/testconfiguration.properties hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join32.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_smb_mapjoin_14.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_1.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_13.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_14.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_15.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_2.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_3.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_4.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_5.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_6.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_7.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_8.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_9.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket2.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket3.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket4.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/disable_merge_for_bucketing.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/load_dyn_part2.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/parquet_join.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/smb_mapjoin_17.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/smb_mapjoin_20.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/smb_mapjoin_21.q.out hive/branches/spark/ql/src/test/results/clientpositive/spark/smb_mapjoin_25.q.out Modified: hive/branches/spark/itests/src/test/resources/testconfiguration.properties URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/src/test/resources/testconfiguration.properties?rev=1635656&r1=1635655&r2=1635656&view=diff ============================================================================== --- hive/branches/spark/itests/src/test/resources/testconfiguration.properties (original) +++ hive/branches/spark/itests/src/test/resources/testconfiguration.properties Fri Oct 31 02:33:08 2014 @@ -463,6 +463,7 @@ spark.query.files=add_part_multiple.q, \ auto_sortmerge_join_13.q, \ auto_sortmerge_join_14.q, \ auto_sortmerge_join_15.q, \ + auto_sortmerge_join_16.q, \ auto_sortmerge_join_2.q, \ auto_sortmerge_join_3.q, \ auto_sortmerge_join_4.q, \ Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java?rev=1635656&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java Fri Oct 31 02:33:08 2014 @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lib; + +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.util.Stack; +import java.util.regex.Matcher; + +/** + * Rule that matches a particular type of node. + */ +public class TypeRule implements Rule { + + private Class nodeClass; + + public TypeRule(Class<?> nodeClass) { + this.nodeClass = nodeClass; + } + + @Override + public int cost(Stack<Node> stack) throws SemanticException { + if (stack == null) { + return -1; + } + if (nodeClass.isInstance(stack.peek())) { + return 1; + } + return -1; + } + + @Override + public String getName() { + return nodeClass.getName(); + } +} Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1635656&r1=1635655&r2=1635656&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Fri Oct 31 02:33:08 2014 @@ -111,7 +111,7 @@ public class Optimizer { // If optimize hive.optimize.bucketmapjoin.sortedmerge is set, add both // BucketMapJoinOptimizer and SortedMergeBucketMapJoinOptimizer if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) - && !isTezExecEngine && !isSparkExecEngine) { + && !isTezExecEngine) { if (!bucketMapJoinOptimizer) { // No need to add BucketMapJoinOptimizer twice transformations.add(new BucketMapJoinOptimizer()); Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java?rev=1635656&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java Fri Oct 31 02:33:08 2014 @@ -0,0 +1,177 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hive.ql.optimizer.spark; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.spark.GenSparkProcContext; +import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +import java.util.List; +import java.util.Map; +import java.util.Stack; + +/** +* Operator factory for Spark SMBJoin processing. +*/ +public final class SparkSortMergeJoinFactory { + + private SparkSortMergeJoinFactory() { + // prevent instantiation + } + + /** + * Get the branch on which we are invoked (walking) from. See diagram below. + * We are at the SMBJoinOp and could have come from TS of any of the input tables. + */ + public static int getPositionParent(SMBMapJoinOperator op, + Stack<Node> stack) { + int size = stack.size(); + assert size >= 2 && stack.get(size - 1) == op; + Operator<? extends OperatorDesc> parent = + (Operator<? extends OperatorDesc>) stack.get(size - 2); + List<Operator<? extends OperatorDesc>> parOp = op.getParentOperators(); + int pos = parOp.indexOf(parent); + return pos; + } + + /** + * SortMergeMapJoin processor, input is a SMBJoinOp that is part of a MapWork: + * + * MapWork: + * + * (Big) (Small) (Small) + * TS TS TS + * \ | / + * \ DS DS + * \ | / + * SMBJoinOP + * + * 1. Initializes the MapWork's aliasToWork, pointing to big-table's TS. + * 2. Adds the bucketing information to the MapWork. + * 3. Adds localwork to the MapWork, with localWork's aliasToWork pointing to small-table's TS. + */ + private static class SortMergeJoinProcessor implements NodeProcessor { + + public static void setupBucketMapJoinInfo(MapWork plan, SMBMapJoinOperator currMapJoinOp) { + if (currMapJoinOp != null) { + Map<String, Map<String, List<String>>> aliasBucketFileNameMapping = + currMapJoinOp.getConf().getAliasBucketFileNameMapping(); + if (aliasBucketFileNameMapping != null) { + MapredLocalWork localPlan = plan.getMapLocalWork(); + if (localPlan == null) { + localPlan = currMapJoinOp.getConf().getLocalWork(); + } else { + // local plan is not null, we want to merge it into SMBMapJoinOperator's local work + MapredLocalWork smbLocalWork = currMapJoinOp.getConf().getLocalWork(); + if (smbLocalWork != null) { + localPlan.getAliasToFetchWork().putAll(smbLocalWork.getAliasToFetchWork()); + localPlan.getAliasToWork().putAll(smbLocalWork.getAliasToWork()); + } + } + + if (localPlan == null) { + return; + } + plan.setMapLocalWork(null); + currMapJoinOp.getConf().setLocalWork(localPlan); + + BucketMapJoinContext bucketMJCxt = new BucketMapJoinContext(); + localPlan.setBucketMapjoinContext(bucketMJCxt); + bucketMJCxt.setAliasBucketFileNameMapping(aliasBucketFileNameMapping); + bucketMJCxt.setBucketFileNameMapping( + currMapJoinOp.getConf().getBigTableBucketNumMapping()); + localPlan.setInputFileChangeSensitive(true); + bucketMJCxt.setMapJoinBigTableAlias(currMapJoinOp.getConf().getBigTableAlias()); + bucketMJCxt + .setBucketMatcherClass(org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class); + bucketMJCxt.setBigTablePartSpecToFileMapping( + currMapJoinOp.getConf().getBigTablePartSpecToFileMapping()); + + plan.setUseBucketizedHiveInputFormat(true); + + } + } + } + + /** + * Initialize the mapWork. + * + * @param opProcCtx + * processing context + */ + private static void initSMBJoinPlan(MapWork mapWork, + GenSparkProcContext opProcCtx, boolean local) + throws SemanticException { + TableScanOperator ts = (TableScanOperator) opProcCtx.currentRootOperator; + String currAliasId = findAliasId(opProcCtx, ts); + GenMapRedUtils.setMapWork(mapWork, opProcCtx.parseContext, + opProcCtx.inputs, null, ts, currAliasId, opProcCtx.conf, local); + } + + private static String findAliasId(GenSparkProcContext opProcCtx, TableScanOperator ts) { + for (String alias : opProcCtx.topOps.keySet()) { + if (opProcCtx.topOps.get(alias) == ts) { + return alias; + } + } + return null; + } + + /** + * 1. Initializes the MapWork's aliasToWork, pointing to big-table's TS. + * 2. Adds the bucketing information to the MapWork. + * 3. Adds localwork to the MapWork, with localWork's aliasToWork pointing to small-table's TS. + */ + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + SMBMapJoinOperator mapJoin = (SMBMapJoinOperator) nd; + GenSparkProcContext ctx = (GenSparkProcContext) procCtx; + + SparkTask currTask = ctx.currentTask; + + // find the branch on which this processor was invoked + int pos = getPositionParent(mapJoin, stack); + boolean local = pos != mapJoin.getConf().getPosBigTable(); + + MapWork mapWork = ctx.smbJoinWorkMap.get(mapJoin); + initSMBJoinPlan(mapWork, ctx, local); + + // find the associated mapWork that contains this processor. + setupBucketMapJoinInfo(mapWork, mapJoin); + + // local aliases need not to hand over context further + return false; + } + } + + public static NodeProcessor getTableScanMapJoin() { + return new SortMergeJoinProcessor(); + } +} Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java?rev=1635656&r1=1635655&r2=1635656&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java Fri Oct 31 02:33:08 2014 @@ -18,20 +18,13 @@ package org.apache.hadoop.hive.ql.parse.spark; -import java.io.Serializable; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -49,6 +42,14 @@ import org.apache.hadoop.hive.ql.plan.Op import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; +import java.io.Serializable; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + /** * GenSparkProcContext maintains information about the tasks and operators * as we walk the operator tree to break them into SparkTasks. @@ -100,6 +101,9 @@ public class GenSparkProcContext impleme // map that says which mapjoin belongs to which work item public final Map<MapJoinOperator, List<BaseWork>> mapJoinWorkMap; + // a map to keep track of which MapWork item holds which SMBMapJoinOp + public final Map<SMBMapJoinOperator, MapWork> smbJoinWorkMap; + // a map to keep track of which root generated which work public final Map<Operator<?>, BaseWork> rootToWorkMap; @@ -134,13 +138,6 @@ public class GenSparkProcContext impleme // This is necessary as sometimes semantic analyzer's mapping is different than operator's own alias. public final Map<String, Operator<? extends OperatorDesc>> topOps; - // Keep track of the current table alias (from last TableScan) - public String currentAliasId; - - // Keep track of the current Table-Scan. - public TableScanOperator currentTs; - - @SuppressWarnings("unchecked") public GenSparkProcContext(HiveConf conf, ParseContext parseContext, List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks, @@ -158,6 +155,7 @@ public class GenSparkProcContext impleme this.leafOperatorToFollowingWork = new LinkedHashMap<Operator<?>, BaseWork>(); this.linkOpWithWorkMap = new LinkedHashMap<Operator<?>, Map<BaseWork, SparkEdgeProperty>>(); this.linkWorkWithReduceSinkMap = new LinkedHashMap<BaseWork, List<ReduceSinkOperator>>(); + this.smbJoinWorkMap = new LinkedHashMap<SMBMapJoinOperator, MapWork>(); this.mapJoinWorkMap = new LinkedHashMap<MapJoinOperator, List<BaseWork>>(); this.rootToWorkMap = new LinkedHashMap<Operator<?>, BaseWork>(); this.childToWorkMap = new LinkedHashMap<Operator<?>, List<BaseWork>>(); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1635656&r1=1635655&r2=1635656&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Fri Oct 31 02:33:08 2014 @@ -18,21 +18,14 @@ package org.apache.hadoop.hive.ql.parse.spark; -import java.util.ArrayList; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; @@ -49,15 +42,21 @@ import org.apache.hadoop.hive.ql.parse.P import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.UnionWork; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * GenSparkUtils is a collection of shared helper methods to produce SparkWork @@ -94,7 +93,7 @@ public class GenSparkUtils { return unionWork; } - public ReduceWork createReduceWork(GenSparkProcContext context, Operator<?> root, SparkWork sparkWork) { + public ReduceWork createReduceWork(GenSparkProcContext context, Operator<?> root, SparkWork sparkWork) throws SemanticException { Preconditions.checkArgument(!root.getParentOperators().isEmpty(), "AssertionError: expected root.getParentOperators() to be non-empty"); @@ -128,10 +127,19 @@ public class GenSparkUtils { } if (reduceWork.getReducer() instanceof JoinOperator) { - //reduce-side join + //reduce-side join, use MR-style shuffle edgeProp.setMRShuffle(); } + //If its a FileSink to bucketed files, also use MR-style shuffle to get compatible taskId for bucket-name + FileSinkOperator fso = getChildOperator(reduceWork.getReducer(), FileSinkOperator.class); + if (fso != null) { + String bucketCount = fso.getConf().getTableInfo().getProperties().getProperty(hive_metastoreConstants.BUCKET_COUNT); + if (bucketCount != null && Integer.valueOf(bucketCount) > 1) { + edgeProp.setMRShuffle(); + } + } + sparkWork.connect( context.preceedingWork, reduceWork, edgeProp); @@ -158,7 +166,12 @@ public class GenSparkUtils { } public MapWork createMapWork(GenSparkProcContext context, Operator<?> root, - SparkWork sparkWork, PrunedPartitionList partitions) throws SemanticException { + SparkWork sparkWork, PrunedPartitionList partitions) throws SemanticException { + return createMapWork(context, root, sparkWork, partitions, false); + } + + public MapWork createMapWork(GenSparkProcContext context, Operator<?> root, + SparkWork sparkWork, PrunedPartitionList partitions, boolean deferSetup) throws SemanticException { Preconditions.checkArgument(root.getParentOperators().isEmpty(), "AssertionError: expected root.getParentOperators() to be empty"); MapWork mapWork = new MapWork("Map "+ (++sequenceNumber)); @@ -170,7 +183,9 @@ public class GenSparkUtils { root.getClass().getName()); String alias = ((TableScanOperator)root).getConf().getAlias(); - setupMapWork(mapWork, context, partitions, root, alias); + if (!deferSetup) { + setupMapWork(mapWork, context, partitions, root, alias); + } // add new item to the Spark work sparkWork.add(mapWork); @@ -322,27 +337,17 @@ public class GenSparkUtils { return true; } - - /** - * Is an operator of the given class a child of the given operator. This is more flexible - * than GraphWalker to tell apart subclasses such as SMBMapJoinOp vs MapJoinOp that have a common name. - * @param op parent operator to start search - * @param klazz given class - * @return - * @throws SemanticException - */ - public static Operator<?> getChildOperator(Operator<?> op, Class klazz) throws SemanticException { + public static <T> T getChildOperator(Operator<?> op, Class<T> klazz) throws SemanticException { if (klazz.isInstance(op)) { - return op; + return (T) op; } List<Operator<?>> childOperators = op.getChildOperators(); for (Operator<?> childOp : childOperators) { - Operator result = getChildOperator(childOp, klazz); + T result = getChildOperator(childOp, klazz); if (result != null) { return result; } } return null; } - } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1635656&r1=1635655&r2=1635656&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java Fri Oct 31 02:33:08 2014 @@ -18,36 +18,40 @@ package org.apache.hadoop.hive.ql.parse.spark; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Stack; - +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.UnionWork; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Stack; /** * GenSparkWork separates the operator tree into spark tasks. @@ -102,6 +106,26 @@ public class GenSparkWork implements Nod SparkWork sparkWork = context.currentTask.getWork(); + + if (GenSparkUtils.getChildOperator(root, DummyStoreOperator.class) != null) { + /* + * SMB join case: + * + * (Big) (Small) (Small) + * TS TS TS + * \ | / + * \ DS DS + * \ | / + * SMBJoinOP + * + * Only create MapWork rooted at TS of big table. + * If there are dummy-store operators anywhere in TS's children path, then this is for the small tables. + * No separate Map-Task need to be created for small table TS, as they will be read by the MapWork of the big-table. + */ + return null; + } + SMBMapJoinOperator smbOp = (SMBMapJoinOperator) GenSparkUtils.getChildOperator(root, SMBMapJoinOperator.class); + // Right now the work graph is pretty simple. If there is no // Preceding work we have a root and will generate a map // vertex. If there is a preceding work we will generate @@ -119,7 +143,18 @@ public class GenSparkWork implements Nod } else { // create a new vertex if (context.preceedingWork == null) { - work = utils.createMapWork(context, root, sparkWork, null); + if (smbOp != null) { + //This logic is for SortMergeBucket MapJoin case. + //This MapWork (of big-table, see above..) is later initialized by SparkMapJoinFactory processor, so don't initialize it here. + //Just keep track of it in the context, for later processing. + work = utils.createMapWork(context, root, sparkWork, null, true); + if (context.smbJoinWorkMap.get(smbOp) != null) { + throw new SemanticException("Each SMBMapJoin should be associated only with one Mapwork"); + } + context.smbJoinWorkMap.put(smbOp, (MapWork) work); + } else { + work = utils.createMapWork(context, root, sparkWork, null); + } } else { work = utils.createReduceWork(context, root, sparkWork); } @@ -284,9 +319,17 @@ public class GenSparkWork implements Nod edgeProp.setShuffleSort(); } if (rWork.getReducer() instanceof JoinOperator) { - //reduce-side join + //reduce-side join, use MR-style shuffle edgeProp.setMRShuffle(); } + //If its a FileSink to bucketed files, also use MR-style shuffle to get compatible taskId for bucket-name + FileSinkOperator fso = GenSparkUtils.getChildOperator(rWork.getReducer(), FileSinkOperator.class); + if (fso != null) { + String bucketCount = fso.getConf().getTableInfo().getProperties().getProperty(hive_metastoreConstants.BUCKET_COUNT); + if (bucketCount != null && Integer.valueOf(bucketCount) > 1) { + edgeProp.setMRShuffle(); + } + } sparkWork.connect(work, rWork, edgeProp); context.connectedReduceSinks.add(rs); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1635656&r1=1635655&r2=1635656&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Fri Oct 31 02:33:08 2014 @@ -17,27 +17,15 @@ */ package org.apache.hadoop.hive.ql.parse.spark; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Deque; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.JoinOperator; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UnionOperator; @@ -54,6 +42,7 @@ import org.apache.hadoop.hive.ql.lib.Nod import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.lib.TypeRule; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; @@ -61,8 +50,7 @@ import org.apache.hadoop.hive.ql.optimiz import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger; import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; import org.apache.hadoop.hive.ql.optimizer.spark.SetSparkReducerParallelism; -import org.apache.hadoop.hive.ql.optimizer.spark.SparkMapJoinOptimizer; -import org.apache.hadoop.hive.ql.optimizer.spark.SparkReduceSinkMapJoinProc; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory; import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -73,6 +61,17 @@ import org.apache.hadoop.hive.ql.plan.Mo import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; /** * SparkCompiler translates the operator plan into SparkTasks. * @@ -181,6 +180,20 @@ public class SparkCompiler extends TaskC GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx); ogw.startWalking(topNodes, null); + + + // ------------------- Second Pass ----------------------- + // SMB Join optimizations to add the "localWork" and bucketing data structures to MapWork. + opRules.clear(); + opRules.put(new TypeRule(SMBMapJoinOperator.class), + SparkSortMergeJoinFactory.getTableScanMapJoin()); + + disp = new DefaultRuleDispatcher(null, opRules, procCtx); + topNodes = new ArrayList<Node>(); + topNodes.addAll(pCtx.getTopOps().values()); + ogw = new GenSparkWorkWalker(disp, procCtx); + ogw.startWalking(topNodes, null); + // we need to clone some operator plans and remove union operators still for (BaseWork w: procCtx.workWithUnionOperators) { GenSparkUtils.getUtils().removeUnionOperators(conf, procCtx, w); Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join32.q.out URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join32.q.out?rev=1635656&r1=1635655&r2=1635656&view=diff ============================================================================== --- hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join32.q.out (original) +++ hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join32.q.out Fri Oct 31 02:33:08 2014 @@ -168,77 +168,51 @@ STAGE PLANS: Stage: Stage-1 Spark Edges: - Reducer 2 <- Map 1 (GROUP PARTITION-LEVEL SORT, 1), Map 4 (GROUP PARTITION-LEVEL SORT, 1) - Reducer 3 <- Reducer 2 (GROUP SORT, 1) + Reducer 2 <- Map 1 (GROUP SORT, 1) #### A masked pattern was here #### Vertices: Map 1 Map Operator Tree: TableScan - alias: v - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Filter Operator - predicate: name is not null (type: boolean) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: name (type: string) - sort order: + - Map-reduce partition columns: name (type: string) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - value expressions: registration (type: string) - Map 4 - Map Operator Tree: - TableScan alias: s Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE Filter Operator predicate: name is not null (type: boolean) Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: name (type: string) - sort order: + - Map-reduce partition columns: name (type: string) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Sorted Merge Bucket Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {name} + 1 {registration} + keys: + 0 name (type: string) + 1 name (type: string) + outputColumnNames: _col0, _col8 + Select Operator + expressions: _col0 (type: string), _col8 (type: string) + outputColumnNames: _col0, _col8 + Group By Operator + aggregations: count(DISTINCT _col8) + keys: _col0 (type: string), _col8 (type: string) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string) Reducer 2 Reduce Operator Tree: - Join Operator - condition map: - Inner Join 0 to 1 - condition expressions: - 0 {KEY.reducesinkkey0} - 1 {VALUE._col1} - outputColumnNames: _col0, _col8 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Select Operator - expressions: _col0 (type: string), _col8 (type: string) - outputColumnNames: _col0, _col8 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Group By Operator - aggregations: count(DISTINCT _col8) - keys: _col0 (type: string), _col8 (type: string) - mode: hash - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string), _col1 (type: string) - sort order: ++ - Map-reduce partition columns: _col0 (type: string) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reducer 3 - Reduce Operator Tree: Group By Operator aggregations: count(DISTINCT KEY._col1:0._col0) keys: KEY._col0 (type: string) mode: mergepartial outputColumnNames: _col0, _col1 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE Select Operator expressions: _col0 (type: string), _col1 (type: bigint) outputColumnNames: _col0, _col1 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat @@ -316,77 +290,51 @@ STAGE PLANS: Stage: Stage-1 Spark Edges: - Reducer 2 <- Map 1 (GROUP PARTITION-LEVEL SORT, 1), Map 4 (GROUP PARTITION-LEVEL SORT, 1) - Reducer 3 <- Reducer 2 (GROUP SORT, 1) + Reducer 2 <- Map 1 (GROUP SORT, 1) #### A masked pattern was here #### Vertices: Map 1 Map Operator Tree: TableScan - alias: v - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Filter Operator - predicate: name is not null (type: boolean) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: name (type: string) - sort order: + - Map-reduce partition columns: name (type: string) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - value expressions: registration (type: string) - Map 4 - Map Operator Tree: - TableScan alias: s Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE Filter Operator predicate: name is not null (type: boolean) Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: name (type: string) - sort order: + - Map-reduce partition columns: name (type: string) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Sorted Merge Bucket Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {name} + 1 {registration} + keys: + 0 name (type: string) + 1 name (type: string) + outputColumnNames: _col0, _col8 + Select Operator + expressions: _col0 (type: string), _col8 (type: string) + outputColumnNames: _col0, _col8 + Group By Operator + aggregations: count(DISTINCT _col8) + keys: _col0 (type: string), _col8 (type: string) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string) Reducer 2 Reduce Operator Tree: - Join Operator - condition map: - Inner Join 0 to 1 - condition expressions: - 0 {KEY.reducesinkkey0} - 1 {VALUE._col1} - outputColumnNames: _col0, _col8 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Select Operator - expressions: _col0 (type: string), _col8 (type: string) - outputColumnNames: _col0, _col8 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Group By Operator - aggregations: count(DISTINCT _col8) - keys: _col0 (type: string), _col8 (type: string) - mode: hash - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string), _col1 (type: string) - sort order: ++ - Map-reduce partition columns: _col0 (type: string) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reducer 3 - Reduce Operator Tree: Group By Operator aggregations: count(DISTINCT KEY._col1:0._col0) keys: KEY._col0 (type: string) mode: mergepartial outputColumnNames: _col0, _col1 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE Select Operator expressions: _col0 (type: string), _col1 (type: bigint) outputColumnNames: _col0, _col1 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat @@ -488,52 +436,22 @@ STAGE PLANS: Stage: Stage-1 Spark Edges: - Reducer 2 <- Map 1 (GROUP PARTITION-LEVEL SORT, 1), Map 4 (GROUP PARTITION-LEVEL SORT, 1) - Reducer 3 <- Reducer 2 (GROUP SORT, 1) + Reducer 2 <- Map 1 (GROUP SORT, 1) #### A masked pattern was here #### Vertices: Map 1 - Map 4 Reducer 2 Reduce Operator Tree: - Join Operator - condition map: - Inner Join 0 to 1 - condition expressions: - 0 {KEY.reducesinkkey0} - 1 {VALUE._col1} - outputColumnNames: _col0, _col9 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Select Operator - expressions: _col0 (type: string), _col9 (type: string) - outputColumnNames: _col0, _col9 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Group By Operator - aggregations: count(DISTINCT _col9) - keys: _col0 (type: string), _col9 (type: string) - mode: hash - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string), _col1 (type: string) - sort order: ++ - Map-reduce partition columns: _col0 (type: string) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reducer 3 - Reduce Operator Tree: Group By Operator aggregations: count(DISTINCT KEY._col1:0._col0) keys: KEY._col0 (type: string) mode: mergepartial outputColumnNames: _col0, _col1 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE Select Operator expressions: _col0 (type: string), _col1 (type: bigint) outputColumnNames: _col0, _col1 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat