Author: gunther Date: Fri Oct 4 03:39:34 2013 New Revision: 1529074 URL: http://svn.apache.org/r1529074 Log: HIVE-5439: Set input edge map for map join operator in Tez (Gunther Hagleitner)
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1529074&r1=1529073&r2=1529074&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri Oct 4 03:39:34 2013 @@ -69,6 +69,7 @@ import org.apache.tez.dag.api.OutputDesc import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.Vertex; import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy; +import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput; import org.apache.tez.runtime.library.output.OnFileSortedOutput; import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; @@ -168,7 +169,7 @@ public class DagUtils { case BROADCAST_EDGE: dataMovementType = DataMovementType.BROADCAST; logicalOutputClass = OnFileUnorderedKVOutput.class; - logicalInputClass = MRInput.class; + logicalInputClass = ShuffledUnorderedKVInput.class; break; case SIMPLE_EDGE: Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1529074&r1=1529073&r2=1529074&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Fri Oct 4 03:39:34 2013 @@ -4,6 +4,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Stack; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -20,6 +22,8 @@ import org.apache.hadoop.hive.ql.plan.Te public class ReduceSinkMapJoinProc implements NodeProcessor { + protected transient Log LOG = LogFactory.getLog(this.getClass().getName()); + /* (non-Javadoc) * This processor addresses the RS-MJ case that occurs in tez on the small/hash * table side of things. The connection between the work that RS will be a part of @@ -37,9 +41,16 @@ public class ReduceSinkMapJoinProc imple context.currentRootOperator = null; MapJoinOperator mapJoinOp = (MapJoinOperator)nd; + Operator<? extends OperatorDesc> childOp = mapJoinOp.getChildOperators().get(0); - Operator<? extends OperatorDesc>childOp = mapJoinOp.getChildOperators().get(0); ReduceSinkOperator parentRS = (ReduceSinkOperator)stack.get(stack.size() - 2); + + // remember the original parent list before we start modifying it. + if (!context.mapJoinParentMap.containsKey(mapJoinOp)) { + List<Operator<?>> parents = new ArrayList(mapJoinOp.getParentOperators()); + context.mapJoinParentMap.put(mapJoinOp, parents); + } + while (childOp != null) { if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof FileSinkOperator)) { /* @@ -54,6 +65,15 @@ public class ReduceSinkMapJoinProc imple BaseWork myWork = context.operatorWorkMap.get(childOp); BaseWork parentWork = context.operatorWorkMap.get(parentRS); + + // set the link between mapjoin and parent vertex + int pos = context.mapJoinParentMap.get(mapJoinOp).indexOf(parentRS); + if (pos == -1) { + throw new SemanticException("Cannot find position of parent in mapjoin"); + } + LOG.debug("Mapjoin "+mapJoinOp+", pos: "+pos+" --> "+parentWork.getName()); + mapJoinOp.getConf().getParentToInput().put(pos, parentWork.getName()); + if (myWork != null) { // link the work with the work associated with the reduce sink that triggered this rule TezWork tezWork = context.currentTask.getWork(); Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1529074&r1=1529073&r2=1529074&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Fri Oct 4 03:39:34 2013 @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -89,6 +90,9 @@ public class GenTezProcContext implement // a map that maintains operator (file-sink or reduce-sink) to work mapping public final Map<Operator<?>, BaseWork> operatorWorkMap; + // we need to keep the original list of operators in the map join to know + // what position in the mapjoin the different parent work items will have. + public final Map<MapJoinOperator, List<Operator<?>>> mapJoinParentMap; @SuppressWarnings("unchecked") public GenTezProcContext(HiveConf conf, ParseContext parseContext, @@ -106,5 +110,6 @@ public class GenTezProcContext implement this.rootOperators = rootOperators; this.linkOpWithWorkMap = new HashMap<Operator<?>, List<BaseWork>>(); this.operatorWorkMap = new HashMap<Operator<?>, BaseWork>(); + this.mapJoinParentMap = new HashMap<MapJoinOperator, List<Operator<?>>>(); } } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java?rev=1529074&r1=1529073&r2=1529074&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java Fri Oct 4 03:39:34 2013 @@ -47,6 +47,9 @@ public class MapJoinDesc extends JoinDes private transient String bigTableAlias; + // for tez. used to remember which position maps to which logical input + private Map<Integer, String> parentToInput = new HashMap<Integer, String>(); + // table alias (small) --> input file name (big) --> target file names (small) private Map<String, Map<String, List<String>>> aliasBucketFileNameMapping; private Map<String, Integer> bigTableBucketNumMapping; @@ -74,6 +77,7 @@ public class MapJoinDesc extends JoinDes this.bigTableBucketNumMapping = clone.bigTableBucketNumMapping; this.bigTablePartSpecToFileMapping = clone.bigTablePartSpecToFileMapping; this.dumpFilePrefix = clone.dumpFilePrefix; + this.parentToInput = clone.parentToInput; } public MapJoinDesc(final Map<Byte, List<ExprNodeDesc>> keys, @@ -106,6 +110,14 @@ public class MapJoinDesc extends JoinDes } } + public Map<Integer, String> getParentToInput() { + return parentToInput; + } + + public void setParentToInput(Map<Integer, String> parentToInput) { + this.parentToInput = parentToInput; + } + public Map<Byte, List<Integer>> getRetainList() { return retainList; }