HIVE-10626 Spark paln need to be updated [Spark Branch] (Chinna via Jimmy)

Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2df14f9b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2df14f9b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2df14f9b

Branch: refs/heads/spark
Commit: 2df14f9b04dec968751f7b336bde7a5451af9d93
Parents: 81e8e66
Author: Jimmy Xiang <jxi...@cloudera.com>
Authored: Fri May 8 13:14:24 2015 -0700
Committer: Xuefu Zhang <xzh...@cloudera.com>
Committed: Mon Jun 1 14:03:42 2015 -0700

----------------------------------------------------------------------
 .../hive/ql/exec/spark/GroupByShuffler.java     |   4 +
 .../hadoop/hive/ql/exec/spark/MapInput.java     |   8 +-
 .../hadoop/hive/ql/exec/spark/MapTran.java      |   9 +-
 .../hadoop/hive/ql/exec/spark/ReduceTran.java   |   9 +-
 .../hadoop/hive/ql/exec/spark/ShuffleTran.java  |  16 +-
 .../hive/ql/exec/spark/SortByShuffler.java      |   5 +
 .../hadoop/hive/ql/exec/spark/SparkPlan.java    | 165 ++++++++++++++-----
 .../hive/ql/exec/spark/SparkShuffler.java       |   2 +
 .../hadoop/hive/ql/exec/spark/SparkTran.java    |   2 +
 9 files changed, 176 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2df14f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java
index b8e36cb..e128dd2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java
@@ -33,4 +33,8 @@ public class GroupByShuffler implements SparkShuffler {
     return input.groupByKey();
   }
 
+  @Override
+  public String getName() {
+    return "GroupBy";
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2df14f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
index 157e4d8..26cfebd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
@@ -36,6 +36,7 @@ public class MapInput implements 
SparkTran<WritableComparable, Writable,
   private JavaPairRDD<WritableComparable, Writable> hadoopRDD;
   private boolean toCache;
   private final SparkPlan sparkPlan;
+  private String name = "MapInput";
 
   public MapInput(SparkPlan sparkPlan, JavaPairRDD<WritableComparable, 
Writable> hadoopRDD) {
     this(sparkPlan, hadoopRDD, false);
@@ -88,11 +89,16 @@ public class MapInput implements 
SparkTran<WritableComparable, Writable,
 
   @Override
   public String getName() {
-    return "MapInput";
+    return name;
   }
 
   @Override
   public Boolean isCacheEnable() {
     return new Boolean(toCache);
   }
+
+  @Override
+  public void setName(String name) {
+    this.name = name;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2df14f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
index f6a4d77..2170243 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
@@ -24,6 +24,7 @@ import org.apache.spark.api.java.JavaPairRDD;
 
 public class MapTran implements SparkTran<BytesWritable, BytesWritable, 
HiveKey, BytesWritable> {
   private HiveMapFunction mapFunc;
+  private String name = "MapTran";
 
   @Override
   public JavaPairRDD<HiveKey, BytesWritable> transform(
@@ -37,12 +38,16 @@ public class MapTran implements SparkTran<BytesWritable, 
BytesWritable, HiveKey,
 
   @Override
   public String getName() {
-    return "MapTran";
+    return name;
   }
 
   @Override
   public Boolean isCacheEnable() {
-    // TODO Auto-generated method stub
     return null;
   }
+
+  @Override
+  public void setName(String name) {
+    this.name = name;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2df14f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
index fd6b31c..e60dfac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
@@ -24,6 +24,7 @@ import org.apache.spark.api.java.JavaPairRDD;
 
 public class ReduceTran implements SparkTran<HiveKey, Iterable<BytesWritable>, 
HiveKey, BytesWritable> {
   private HiveReduceFunction reduceFunc;
+  private String name = "Reduce";
 
   @Override
   public JavaPairRDD<HiveKey, BytesWritable> transform(
@@ -37,12 +38,16 @@ public class ReduceTran implements SparkTran<HiveKey, 
Iterable<BytesWritable>, H
 
   @Override
   public String getName() {
-    return "Reduce";
+    return name;
   }
 
   @Override
   public Boolean isCacheEnable() {
-    // TODO Auto-generated method stub
     return null;
   }
+
+  @Override
+  public void setName(String name) {
+    this.name = name;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2df14f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
index 6cdab20..a774395 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
@@ -28,6 +28,7 @@ public class ShuffleTran implements SparkTran<HiveKey, 
BytesWritable, HiveKey, I
   private final int numOfPartitions;
   private final boolean toCache;
   private final SparkPlan sparkPlan;
+  private String name = "Shuffle";
 
   public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n) {
     this(sparkPlan, sf, n, false);
@@ -50,13 +51,26 @@ public class ShuffleTran implements SparkTran<HiveKey, 
BytesWritable, HiveKey, I
     return result;
   }
 
+  public int getNoOfPartitions() {
+    return numOfPartitions;
+  }
+
   @Override
   public String getName() {
-    return "Shuffle";
+    return name;
   }
 
   @Override
   public Boolean isCacheEnable() {
     return new Boolean(toCache);
   }
+
+  @Override
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public SparkShuffler getShuffler() {
+    return shuffler;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2df14f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
index 2545a9d..766813c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
@@ -56,6 +56,11 @@ public class SortByShuffler implements SparkShuffler {
     return rdd.mapPartitionsToPair(new ShuffleFunction());
   }
 
+  @Override
+  public String getName() {
+    return "SortBy";
+  }
+
   private static class ShuffleFunction implements
       PairFlatMapFunction<Iterator<Tuple2<HiveKey, BytesWritable>>,
           HiveKey, Iterable<BytesWritable>> {

http://git-wip-us.apache.org/repos/asf/hive/blob/2df14f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
index 81b7e85..ee5c78a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
@@ -91,57 +91,146 @@ public class SparkPlan {
     return finalRDD;
   }
 
+  private void addNumberToTrans() {
+    int i = 1;
+    String name = null;
+
+    // Traverse leafTran & transGraph add numbers to trans
+    for (SparkTran leaf : leafTrans) {
+      name = leaf.getName() + " " + i++;
+      leaf.setName(name);
+    }
+    Set<SparkTran> sparkTrans = transGraph.keySet();
+    for (SparkTran tran : sparkTrans) {
+      name = tran.getName() + " " + i++;
+      tran.setName(name);
+    }
+  }
+
   private void logSparkPlan() {
-    LOG.info("------------------------------ Spark Plan 
-----------------------------");
-    Set<SparkTran> keySet = invertedTransGraph.keySet();
-    for (SparkTran sparkTran : keySet) {
-      if (sparkTran instanceof ReduceTran) {
-       String sparkPlan = "    " + sparkTran.getName();
-       sparkPlan = getSparkPlan(sparkTran, sparkPlan);
-       LOG.info(sparkPlan);
+    addNumberToTrans();
+    ArrayList<SparkTran> leafTran = new ArrayList<SparkTran>();
+    leafTran.addAll(leafTrans);
+
+    for (SparkTran leaf : leafTrans) {
+      collectLeafTrans(leaf, leafTran);
+    }
+
+    // Start Traverse from the leafTrans and get parents of each leafTrans till
+    // the end
+    StringBuilder sparkPlan = new StringBuilder(
+      "\n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan 
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! \n\n");
+    for (SparkTran leaf : leafTran) {
+      sparkPlan.append(leaf.getName());
+      getSparkPlan(leaf, sparkPlan);
+      sparkPlan.append("\n");
+    }
+    sparkPlan
+      .append(" \n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan 
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ");
+    LOG.info(sparkPlan);
+  }
+
+  private void collectLeafTrans(SparkTran leaf, List<SparkTran> reduceTrans) {
+    List<SparkTran> parents = getParents(leaf);
+    if (parents.size() > 0) {
+      SparkTran nextLeaf = null;
+      for (SparkTran leafTran : parents) {
+        if (leafTran instanceof ReduceTran) {
+          reduceTrans.add(leafTran);
+        } else {
+          if (getParents(leafTran).size() > 0)
+            nextLeaf = leafTran;
+        }
       }
+      if (nextLeaf != null)
+        collectLeafTrans(nextLeaf, reduceTrans);
     }
-    LOG.info("------------------------------ Spark Plan 
-----------------------------");
-  }
-
-  private String getSparkPlan(SparkTran leaf, String sparkPlanMsg) {
-    if (leaf != null) {
-      List<SparkTran> parents = getParents(leaf);
-      if (parents.size() > 0) {
-       sparkPlanMsg = sparkPlanMsg + " <-- ";
-       boolean isFirst = true;
-       SparkTran parent = null;
-       for (SparkTran sparkTran : parents) {
-         if (isFirst) {
-           sparkPlanMsg = sparkPlanMsg + "( " + sparkTran.getName();
-           sparkPlanMsg = logCacheStatus(sparkPlanMsg, sparkTran);
-           isFirst = false;
-         } else {
-           sparkPlanMsg = sparkPlanMsg + "," + sparkTran.getName();
-           sparkPlanMsg = logCacheStatus(sparkPlanMsg, sparkTran);
-         }
-         if (getParents(sparkTran).size() > 0 && !(sparkTran instanceof 
ReduceTran)) {
-           parent = sparkTran;
-         }
-       }
-       sparkPlanMsg = sparkPlanMsg + " ) ";
-       return getSparkPlan(parent, sparkPlanMsg);
+  }
+
+  private void getSparkPlan(SparkTran tran, StringBuilder sparkPlan) {
+    List<SparkTran> parents = getParents(tran);
+    List<SparkTran> nextLeaf = new ArrayList<SparkTran>();
+    if (parents.size() > 0) {
+      sparkPlan.append(" <-- ");
+      boolean isFirst = true;
+      for (SparkTran leaf : parents) {
+        if (isFirst) {
+          sparkPlan.append("( " + leaf.getName());
+          if (leaf instanceof ShuffleTran) {
+            logShuffleTranStatus((ShuffleTran) leaf, sparkPlan);
+          } else {
+            logCacheStatus(leaf, sparkPlan);
+          }
+          isFirst = false;
+        } else {
+          sparkPlan.append("," + leaf.getName());
+          if (leaf instanceof ShuffleTran) {
+            logShuffleTranStatus((ShuffleTran) leaf, sparkPlan);
+          } else {
+            logCacheStatus(leaf, sparkPlan);
+          }
+        }
+        // Leave reduceTran it will be expanded in the next line
+        if (getParents(leaf).size() > 0 && !(leaf instanceof ReduceTran)) {
+          nextLeaf.add(leaf);
+        }
+      }
+      sparkPlan.append(" ) ");
+      if (nextLeaf.size() > 1) {
+        logLeafTran(nextLeaf, sparkPlan);
       } else {
-       return sparkPlanMsg;
+        if (nextLeaf.size() != 0)
+          getSparkPlan(nextLeaf.get(0), sparkPlan);
       }
     }
-    return sparkPlanMsg;
   }
 
-  private String logCacheStatus(String sparkPlanMsg, SparkTran sparkTran) {
+  private void logLeafTran(List<SparkTran> parent, StringBuilder sparkPlan) {
+    sparkPlan.append(" <-- ");
+    boolean isFirst = true;
+    for (SparkTran sparkTran : parent) {
+      List<SparkTran> parents = getParents(sparkTran);
+      SparkTran leaf = parents.get(0);
+      if (isFirst) {
+        sparkPlan.append("( " + leaf.getName());
+        if (leaf instanceof ShuffleTran) {
+          logShuffleTranStatus((ShuffleTran) leaf, sparkPlan);
+        } else {
+          logCacheStatus(leaf, sparkPlan);
+        }
+        isFirst = false;
+      } else {
+        sparkPlan.append("," + leaf.getName());
+        if (leaf instanceof ShuffleTran) {
+          logShuffleTranStatus((ShuffleTran) leaf, sparkPlan);
+        } else {
+          logCacheStatus(leaf, sparkPlan);
+        }
+      }
+    }
+    sparkPlan.append(" ) ");
+  }
+
+  private void logShuffleTranStatus(ShuffleTran leaf, StringBuilder sparkPlan) 
{
+    int noOfPartitions = leaf.getNoOfPartitions();
+    sparkPlan.append(" ( Partitions " + noOfPartitions);
+    SparkShuffler shuffler = leaf.getShuffler();
+    sparkPlan.append(", " + shuffler.getName());
+    if (leaf.isCacheEnable()) {
+      sparkPlan.append(", Cache on");
+    } else {
+      sparkPlan.append(", Cache off");
+    }
+  }
+
+  private void logCacheStatus(SparkTran sparkTran, StringBuilder sparkPlan) {
     if (sparkTran.isCacheEnable() != null) {
       if (sparkTran.isCacheEnable().booleanValue()) {
-       sparkPlanMsg = sparkPlanMsg + " (cache on) ";
+        sparkPlan.append(" (cache on) ");
       } else {
-       sparkPlanMsg = sparkPlanMsg + " (cache off) ";
+        sparkPlan.append(" (cache off) ");
       }
     }
-    return sparkPlanMsg;
   }
 
   public void addTran(SparkTran tran) {

http://git-wip-us.apache.org/repos/asf/hive/blob/2df14f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java
index 53845a0..40e251f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java
@@ -27,4 +27,6 @@ public interface SparkShuffler {
   JavaPairRDD<HiveKey, Iterable<BytesWritable>> shuffle(
       JavaPairRDD<HiveKey, BytesWritable> input, int numPartitions);
 
+  public String getName();
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2df14f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
index c3c48a0..671c983 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
@@ -28,5 +28,7 @@ public interface SparkTran<KI extends WritableComparable, VI, 
KO extends Writabl
 
   public String getName();
 
+  public void setName(String name);
+
   public Boolean isCacheEnable();
 }

Reply via email to