Author: gunther
Date: Fri Oct  4 03:39:34 2013
New Revision: 1529074

URL: http://svn.apache.org/r1529074
Log:
HIVE-5439: Set input edge map for map join operator in Tez (Gunther Hagleitner)

Modified:
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1529074&r1=1529073&r2=1529074&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
(original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
Fri Oct  4 03:39:34 2013
@@ -69,6 +69,7 @@ import org.apache.tez.dag.api.OutputDesc
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
+import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
@@ -168,7 +169,7 @@ public class DagUtils {
     case BROADCAST_EDGE:
       dataMovementType = DataMovementType.BROADCAST;
       logicalOutputClass = OnFileUnorderedKVOutput.class;
-      logicalInputClass = MRInput.class;
+      logicalInputClass = ShuffledUnorderedKVInput.class;
       break;
 
     case SIMPLE_EDGE:

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1529074&r1=1529073&r2=1529074&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
 Fri Oct  4 03:39:34 2013
@@ -4,6 +4,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Stack;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -20,6 +22,8 @@ import org.apache.hadoop.hive.ql.plan.Te
 
 public class ReduceSinkMapJoinProc implements NodeProcessor {
 
+  protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
+
   /* (non-Javadoc)
    * This processor addresses the RS-MJ case that occurs in tez on the 
small/hash
    * table side of things. The connection between the work that RS will be a 
part of
@@ -37,9 +41,16 @@ public class ReduceSinkMapJoinProc imple
     context.currentRootOperator = null;
 
     MapJoinOperator mapJoinOp = (MapJoinOperator)nd;
+    Operator<? extends OperatorDesc> childOp = 
mapJoinOp.getChildOperators().get(0);
 
-    Operator<? extends OperatorDesc>childOp = 
mapJoinOp.getChildOperators().get(0);
     ReduceSinkOperator parentRS = (ReduceSinkOperator)stack.get(stack.size() - 
2);
+
+    // remember the original parent list before we start modifying it.
+    if (!context.mapJoinParentMap.containsKey(mapJoinOp)) {
+      List<Operator<?>> parents = new 
ArrayList(mapJoinOp.getParentOperators());
+      context.mapJoinParentMap.put(mapJoinOp, parents);
+    }
+
     while (childOp != null) {
       if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof 
FileSinkOperator)) {
         /*
@@ -54,6 +65,15 @@ public class ReduceSinkMapJoinProc imple
 
         BaseWork myWork = context.operatorWorkMap.get(childOp);
         BaseWork parentWork = context.operatorWorkMap.get(parentRS);
+          
+        // set the link between mapjoin and parent vertex
+        int pos = context.mapJoinParentMap.get(mapJoinOp).indexOf(parentRS);
+        if (pos == -1) {
+          throw new SemanticException("Cannot find position of parent in 
mapjoin");
+        }
+        LOG.debug("Mapjoin "+mapJoinOp+", pos: "+pos+" --> 
"+parentWork.getName());
+        mapJoinOp.getConf().getParentToInput().put(pos, parentWork.getName());
+
         if (myWork != null) {
           // link the work with the work associated with the reduce sink that 
triggered this rule
           TezWork tezWork = context.currentTask.getWork();

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1529074&r1=1529073&r2=1529074&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
 Fri Oct  4 03:39:34 2013
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -89,6 +90,9 @@ public class GenTezProcContext implement
   // a map that maintains operator (file-sink or reduce-sink) to work mapping
   public final Map<Operator<?>, BaseWork> operatorWorkMap;
 
+  // we need to keep the original list of operators in the map join to know
+  // what position in the mapjoin the different parent work items will have.
+  public final Map<MapJoinOperator, List<Operator<?>>> mapJoinParentMap;
 
   @SuppressWarnings("unchecked")
   public GenTezProcContext(HiveConf conf, ParseContext parseContext,
@@ -106,5 +110,6 @@ public class GenTezProcContext implement
     this.rootOperators = rootOperators;
     this.linkOpWithWorkMap = new HashMap<Operator<?>, List<BaseWork>>();
     this.operatorWorkMap = new HashMap<Operator<?>, BaseWork>();
+    this.mapJoinParentMap = new HashMap<MapJoinOperator, List<Operator<?>>>();
   }
 }

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java?rev=1529074&r1=1529073&r2=1529074&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java 
(original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java 
Fri Oct  4 03:39:34 2013
@@ -47,6 +47,9 @@ public class MapJoinDesc extends JoinDes
 
   private transient String bigTableAlias;
 
+  // for tez. used to remember which position maps to which logical input
+  private Map<Integer, String> parentToInput = new HashMap<Integer, String>();
+
   // table alias (small) --> input file name (big) --> target file names 
(small)
   private Map<String, Map<String, List<String>>> aliasBucketFileNameMapping;
   private Map<String, Integer> bigTableBucketNumMapping;
@@ -74,6 +77,7 @@ public class MapJoinDesc extends JoinDes
     this.bigTableBucketNumMapping = clone.bigTableBucketNumMapping;
     this.bigTablePartSpecToFileMapping = clone.bigTablePartSpecToFileMapping;
     this.dumpFilePrefix = clone.dumpFilePrefix;
+    this.parentToInput = clone.parentToInput;
   }
 
   public MapJoinDesc(final Map<Byte, List<ExprNodeDesc>> keys,
@@ -106,6 +110,14 @@ public class MapJoinDesc extends JoinDes
     }
   }
 
+  public Map<Integer, String> getParentToInput() {
+    return parentToInput;
+  }
+
+  public void setParentToInput(Map<Integer, String> parentToInput) {
+    this.parentToInput = parentToInput;
+  }
+
   public Map<Byte, List<Integer>> getRetainList() {
     return retainList;
   }


Reply via email to