Author: xuefu
Date: Tue Jul 15 04:44:25 2014
New Revision: 1610577

URL: http://svn.apache.org/r1610577
Log:
HIVE-7329: Create SparkWork

Added:
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SparkCompiler.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java?rev=1610577&r1=1610576&r2=1610577&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java 
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java 
Tue Jul 15 04:44:25 2014
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.hive.ql.io;
 
-import java.io.Serializable;
-
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.WritableComparator;
 
@@ -27,11 +25,8 @@ import org.apache.hadoop.io.WritableComp
  * HiveKey is a simple wrapper on Text which allows us to set the hashCode
  * easily. hashCode is used for hadoop partitioner.
  * 
- * TODO: spark require key to be serializable, even if it's considered 
serializable by
- * Hadoop. For now, we let it implement Serializable. However, we expect that 
this is
- * not needed soon.
  */
-public class HiveKey extends BytesWritable implements Serializable {
+public class HiveKey extends BytesWritable {
 
   private static final int LENGTH_BYTES = 4;
 

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SparkCompiler.java?rev=1610577&r1=1610576&r2=1610577&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SparkCompiler.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SparkCompiler.java
 Tue Jul 15 04:44:25 2014
@@ -76,7 +76,9 @@ public class SparkCompiler extends MapRe
     ReduceWork redWork = mrTask.getWork().getReduceWork();
     SparkWork sparkWork = new SparkWork("first spark #" + counter++);
     sparkWork.setMapWork(mapWork);
-    sparkWork.setReduceWork(redWork);
+    if (redWork != null) {
+      sparkWork.setReduceWork(redWork);
+    }
     SparkTask task = new SparkTask();
     task.setWork(sparkWork);
     task.setId(sparkWork.getName());

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java?rev=1610577&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
 Tue Jul 15 04:44:25 2014
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+
+@Explain(displayName = "Edge Property")
+public class SparkEdgeProperty {
+  public static long SHUFFLE_NONE = 0; // No shuffle is needed. For union only.
+  public static long SHUFFLE_GROUP = 1; // Shuffle, keys are coming together
+  public static long SHUFFLE_SORT = 2;  // Shuffle, keys are sorted
+
+  private long value;
+  
+  public SparkEdgeProperty(long value) {
+    this.value = value;
+  }
+  
+  public boolean isShuffleNone() {
+    return value == SHUFFLE_NONE;
+  }
+  
+  public void setShuffleNone() {
+    value = SHUFFLE_NONE;
+  }
+
+  public boolean isShuffleGroup() {
+    return (value & SHUFFLE_GROUP) != 0;
+  }
+  
+  public void setShuffleGroup() {
+    value |= SHUFFLE_GROUP;
+  }
+  
+  public boolean isShuffleSort() {
+    return (value & SHUFFLE_SORT) != 0;
+  }
+
+  public void setShuffleSort() {
+    value |= SHUFFLE_SORT;
+  }
+  
+  public long getValue() {
+    return value;
+  }
+
+  @Explain(displayName = "Shuffle Type")
+  public String getShuffleType() {
+    if (isShuffleNone()) {
+      return "NONE";
+    }
+    
+    StringBuilder sb = new StringBuilder();
+    if (isShuffleGroup()) {
+      sb.append("GROUP");
+    }
+
+    if (sb.length() != 0) {
+      sb.append(" ");
+    }
+
+    if (isShuffleSort()) {
+      sb.append("SORT");
+    }
+
+    return sb.toString();
+  }
+}
+

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java?rev=1610577&r1=1610576&r2=1610577&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java 
(original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java 
Tue Jul 15 04:44:25 2014
@@ -18,24 +18,39 @@
 
 package org.apache.hadoop.hive.ql.plan;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 
 /**
  * This class encapsulates all the work objects that can be executed
  * in a single Spark job. Currently it's basically a tree with MapWork at the
- * leaves and and ReduceWork in all other nodes.
+ * roots and and ReduceWork (or UnionWork) at all other nodes.
  */
 @SuppressWarnings("serial")
 @Explain(displayName = "Spark")
 public class SparkWork extends AbstractOperatorDesc {
-  private static transient final Log logger = 
LogFactory.getLog(SparkWork.class);
-
   private static int counter;
   private final String name;
-  
-  private MapWork mapWork;
-  private ReduceWork redWork;
+
+  private final Set<BaseWork> roots = new HashSet<BaseWork>();
+  private final Set<BaseWork> leaves = new HashSet<BaseWork>();
+
+  protected final Map<BaseWork, List<BaseWork>> workGraph = new 
HashMap<BaseWork, List<BaseWork>>();
+  protected final Map<BaseWork, List<BaseWork>> invertedWorkGraph = new 
HashMap<BaseWork, List<BaseWork>>();
+  protected final Map<Pair<BaseWork, BaseWork>, SparkEdgeProperty> 
edgeProperties =
+      new HashMap<Pair<BaseWork, BaseWork>, SparkEdgeProperty>();
 
   public SparkWork(String name) {
     this.name = name + ":" + (++counter);
@@ -46,20 +61,255 @@ public class SparkWork extends AbstractO
     return name;
   }
 
+  /**
+   * getWorkMap returns a map of "vertex name" to BaseWork
+   */
+  @Explain(displayName = "Vertices")
+  public Map<String, BaseWork> getWorkMap() {
+    Map<String, BaseWork> result = new LinkedHashMap<String, BaseWork>();
+    for (BaseWork w: getAllWork()) {
+      result.put(w.getName(), w);
+    }
+    return result;
+  }
+
+  /**
+   * getAllWork returns a topologically sorted list of BaseWork
+   */
+  public List<BaseWork> getAllWork() {
+
+    List<BaseWork> result = new LinkedList<BaseWork>();
+    Set<BaseWork> seen = new HashSet<BaseWork>();
+
+    for (BaseWork leaf: leaves) {
+      // make sure all leaves are visited at least once
+      visit(leaf, seen, result);
+    }
+
+    return result;
+  }
+
+  public Collection<BaseWork> getAllWorkUnsorted() {
+    return workGraph.keySet();
+  }
+
+  private void visit(BaseWork child, Set<BaseWork> seen, List<BaseWork> 
result) {
+    if (seen.contains(child)) {
+      // don't visit multiple times
+      return;
+    }
+
+    seen.add(child);
+
+    for (BaseWork parent: getParents(child)) {
+      if (!seen.contains(parent)) {
+        visit(parent, seen, result);
+      }
+    }
+
+    result.add(child);
+  }
+
+  /**
+   * add all nodes in the collection without any connections
+   */
+  public void addAll(Collection<BaseWork> c) {
+    for (BaseWork w: c) {
+      this.add(w);
+    }
+  }
+
+  /**
+   * add all nodes in the collection without any connections
+   */
+  public void addAll(BaseWork[] bws) {
+    for (BaseWork w: bws) {
+      this.add(w);
+    }
+  }
+
+  /**
+   * add creates a new node in the graph without any connections
+   */
+  public void add(BaseWork w) {
+    if (workGraph.containsKey(w)) {
+      return;
+    }
+    workGraph.put(w, new LinkedList<BaseWork>());
+    invertedWorkGraph.put(w, new LinkedList<BaseWork>());
+    roots.add(w);
+    leaves.add(w);
+  }
+
+  /**
+   * disconnect removes an edge between a and b. Both a and
+   * b have to be in the graph. If there is no matching edge
+   * no change happens.
+   */
+  public void disconnect(BaseWork a, BaseWork b) {
+    workGraph.get(a).remove(b);
+    invertedWorkGraph.get(b).remove(a);
+    if (getParents(b).isEmpty()) {
+      roots.add(b);
+    }
+    if (getChildren(a).isEmpty()) {
+      leaves.add(a);
+    }
+  }
+
+  /**
+   * getRoots returns all nodes that do not have a parent.
+   */
+  public Set<BaseWork> getRoots() {
+    return new HashSet<BaseWork>(roots);
+  }
+
+  /**
+   * getLeaves returns all nodes that do not have a child
+   */
+  public Set<BaseWork> getLeaves() {
+    return new HashSet<BaseWork>(leaves);
+  }
+
+  /**
+   * getParents returns all the nodes with edges leading into work
+   */
+  public List<BaseWork> getParents(BaseWork work) {
+    assert invertedWorkGraph.containsKey(work)
+      && invertedWorkGraph.get(work) != null;
+    return new LinkedList<BaseWork>(invertedWorkGraph.get(work));
+  }
+
+  /**
+   * getChildren returns all the nodes with edges leading out of work
+   */
+  public List<BaseWork> getChildren(BaseWork work) {
+    assert workGraph.containsKey(work)
+      && workGraph.get(work) != null;
+    return new LinkedList<BaseWork>(workGraph.get(work));
+  }
+
+  /**
+   * remove removes a node from the graph and removes all edges with
+   * work as start or end point. No change to the graph if the node
+   * doesn't exist.
+   */
+  public void remove(BaseWork work) {
+    if (!workGraph.containsKey(work)) {
+      return;
+    }
+
+    List<BaseWork> children = getChildren(work);
+    List<BaseWork> parents = getParents(work);
+
+    for (BaseWork w: children) {
+      invertedWorkGraph.get(w).remove(work);
+      if (invertedWorkGraph.get(w).size() == 0) {
+        roots.add(w);
+      }
+    }
+
+    for (BaseWork w: parents) {
+      workGraph.get(w).remove(work);
+      if (workGraph.get(w).size() == 0) {
+        leaves.add(w);
+      }
+    }
+
+    roots.remove(work);
+    leaves.remove(work);
+
+    workGraph.remove(work);
+    invertedWorkGraph.remove(work);
+  }
+
+  /**
+   * returns the edge type connecting work a and b
+   */
+  public SparkEdgeProperty getEdgeProperty(BaseWork a, BaseWork b) {
+    return edgeProperties.get(new ImmutablePair<BaseWork, BaseWork>(a,b));
+  }
+
+  /**
+   * connect adds an edge between a and b. Both nodes have
+   * to be added prior to calling connect.
+   * @param  
+   */
+  public void connect(BaseWork a, BaseWork b, SparkEdgeProperty edgeProp) {
+    workGraph.get(a).add(b);
+    invertedWorkGraph.get(b).add(a);
+    roots.remove(b);
+    leaves.remove(a);
+    ImmutablePair<BaseWork, BaseWork> workPair = new ImmutablePair<BaseWork, 
BaseWork>(a, b);
+    edgeProperties.put(workPair, edgeProp);
+  }
+
+  /*
+   * Dependency is a class used for explain
+   */ 
+  public class Dependency implements Serializable, Comparable<Dependency> {
+    public BaseWork w;
+    public SparkEdgeProperty prop;
+
+    @Explain(displayName = "Name")
+    public String getName() {
+      return w.getName();
+    }
+    
+    @Explain(displayName = "Shuffle Type")
+    public String getShuffleType() {
+      return prop.getShuffleType();
+    }
+    
+    @Override
+    public int compareTo(Dependency o) {
+      int compare = getName().compareTo(o.getName());
+      if (compare == 0) {
+        compare = getShuffleType().compareTo(o.getShuffleType());
+      }
+      return compare;
+    }
+  }
+
+  @Explain(displayName = "Edges")
+  public Map<String, List<Dependency>> getDependencyMap() {
+    Map<String, List<Dependency>> result = new LinkedHashMap<String, 
List<Dependency>>();
+    for (Map.Entry<BaseWork, List<BaseWork>> entry: 
invertedWorkGraph.entrySet()) {
+      List<Dependency> dependencies = new LinkedList<Dependency>();
+      for (BaseWork d: entry.getValue()) {
+        Dependency dependency = new Dependency();
+        dependency.w = d;
+        dependency.prop = getEdgeProperty(d, entry.getKey());
+        dependencies.add(dependency);
+      }
+      if (!dependencies.isEmpty()) {
+        Collections.sort(dependencies);
+        result.put(entry.getKey().getName(), dependencies);
+      }
+    }
+    return result;
+  }
+
   public MapWork getMapWork() {
-    return mapWork;
+    Iterator<BaseWork> it = roots.iterator();
+    if (it.hasNext())
+      return (MapWork)roots.iterator().next();
+    return null;
   }
 
   public void setMapWork(MapWork mapWork) {
-    this.mapWork = mapWork;
+    roots.add(mapWork);
   }
 
   public void setReduceWork(ReduceWork redWork) {
-    this.redWork = redWork;
+    leaves.add(redWork);
   }
   
   public ReduceWork getReduceWork() {
-    return redWork;
+    Iterator<BaseWork> it = leaves.iterator();
+    if (it.hasNext())
+      return (ReduceWork)leaves.iterator().next();
+    return null;
   }
 
 }


Reply via email to