Repository: hive
Updated Branches:
  refs/heads/spark 65c00b4fc -> 77ecb5706


HIVE-8858: Visualize generated Spark plan [Spark Branch] (Chinna via Jimmy)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/77ecb570
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/77ecb570
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/77ecb570

Branch: refs/heads/spark
Commit: 77ecb5706df2d4d830596792f7cceea836a64efe
Parents: 65c00b4
Author: Jimmy Xiang <jxi...@cloudera.com>
Authored: Wed Apr 22 14:51:35 2015 -0700
Committer: Jimmy Xiang <jxi...@cloudera.com>
Committed: Wed Apr 22 14:51:35 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/spark/MapInput.java     | 10 ++++
 .../hadoop/hive/ql/exec/spark/MapTran.java      | 10 ++++
 .../hadoop/hive/ql/exec/spark/ReduceTran.java   | 11 ++++
 .../hadoop/hive/ql/exec/spark/ShuffleTran.java  | 10 ++++
 .../hadoop/hive/ql/exec/spark/SparkPlan.java    | 58 ++++++++++++++++++++
 .../hadoop/hive/ql/exec/spark/SparkTran.java    |  4 ++
 6 files changed, 103 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/77ecb570/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
index 8d18885..157e4d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
@@ -85,4 +85,14 @@ public class MapInput implements 
SparkTran<WritableComparable, Writable,
     }
 
   }
+
+  @Override
+  public String getName() {
+    return "MapInput";
+  }
+
+  @Override
+  public Boolean isCacheEnable() {
+    return new Boolean(toCache);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/77ecb570/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
index 638c387..f6a4d77 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
@@ -35,4 +35,14 @@ public class MapTran implements SparkTran<BytesWritable, 
BytesWritable, HiveKey,
     this.mapFunc = mapFunc;
   }
 
+  @Override
+  public String getName() {
+    return "MapTran";
+  }
+
+  @Override
+  public Boolean isCacheEnable() {
+    // TODO Auto-generated method stub
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/77ecb570/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
index dbc614b..fd6b31c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
@@ -34,4 +34,15 @@ public class ReduceTran implements SparkTran<HiveKey, 
Iterable<BytesWritable>, H
   public void setReduceFunction(HiveReduceFunction redFunc) {
     this.reduceFunc = redFunc;
   }
+
+  @Override
+  public String getName() {
+    return "Reduce";
+  }
+
+  @Override
+  public Boolean isCacheEnable() {
+    // TODO Auto-generated method stub
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/77ecb570/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
index 4a597ee..6cdab20 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
@@ -49,4 +49,14 @@ public class ShuffleTran implements SparkTran<HiveKey, 
BytesWritable, HiveKey, I
     }
     return result;
   }
+
+  @Override
+  public String getName() {
+    return "Shuffle";
+  }
+
+  @Override
+  public Boolean isCacheEnable() {
+    return new Boolean(toCache);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/77ecb570/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
index b45494d..81b7e85 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.io.BytesWritable;
@@ -36,6 +38,7 @@ import com.google.common.base.Preconditions;
 @SuppressWarnings("rawtypes")
 public class SparkPlan {
   private static final String CLASS_NAME = SparkPlan.class.getName();
+  private static final Log LOG = LogFactory.getLog(SparkPlan.class);
   private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
 
   private final Set<SparkTran> rootTrans = new HashSet<SparkTran>();
@@ -72,6 +75,8 @@ public class SparkPlan {
       tranToOutputRDDMap.put(tran, rdd);
     }
 
+    logSparkPlan();
+
     JavaPairRDD<HiveKey, BytesWritable> finalRDD = null;
     for (SparkTran leafTran : leafTrans) {
       JavaPairRDD<HiveKey, BytesWritable> rdd = 
tranToOutputRDDMap.get(leafTran);
@@ -86,6 +91,59 @@ public class SparkPlan {
     return finalRDD;
   }
 
+  private void logSparkPlan() {
+    LOG.info("------------------------------ Spark Plan 
-----------------------------");
+    Set<SparkTran> keySet = invertedTransGraph.keySet();
+    for (SparkTran sparkTran : keySet) {
+      if (sparkTran instanceof ReduceTran) {
+       String sparkPlan = "    " + sparkTran.getName();
+       sparkPlan = getSparkPlan(sparkTran, sparkPlan);
+       LOG.info(sparkPlan);
+      }
+    }
+    LOG.info("------------------------------ Spark Plan 
-----------------------------");
+  }
+
+  private String getSparkPlan(SparkTran leaf, String sparkPlanMsg) {
+    if (leaf != null) {
+      List<SparkTran> parents = getParents(leaf);
+      if (parents.size() > 0) {
+       sparkPlanMsg = sparkPlanMsg + " <-- ";
+       boolean isFirst = true;
+       SparkTran parent = null;
+       for (SparkTran sparkTran : parents) {
+         if (isFirst) {
+           sparkPlanMsg = sparkPlanMsg + "( " + sparkTran.getName();
+           sparkPlanMsg = logCacheStatus(sparkPlanMsg, sparkTran);
+           isFirst = false;
+         } else {
+           sparkPlanMsg = sparkPlanMsg + "," + sparkTran.getName();
+           sparkPlanMsg = logCacheStatus(sparkPlanMsg, sparkTran);
+         }
+         if (getParents(sparkTran).size() > 0 && !(sparkTran instanceof 
ReduceTran)) {
+           parent = sparkTran;
+         }
+       }
+       sparkPlanMsg = sparkPlanMsg + " ) ";
+       return getSparkPlan(parent, sparkPlanMsg);
+      } else {
+       return sparkPlanMsg;
+      }
+    }
+    return sparkPlanMsg;
+  }
+
+  private String logCacheStatus(String sparkPlanMsg, SparkTran sparkTran) {
+    if (sparkTran.isCacheEnable() != null) {
+      if (sparkTran.isCacheEnable().booleanValue()) {
+       sparkPlanMsg = sparkPlanMsg + " (cache on) ";
+      } else {
+       sparkPlanMsg = sparkPlanMsg + " (cache off) ";
+      }
+    }
+    return sparkPlanMsg;
+  }
+
   public void addTran(SparkTran tran) {
     rootTrans.add(tran);
     leafTrans.add(tran);

http://git-wip-us.apache.org/repos/asf/hive/blob/77ecb570/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
index 4daa61e..c3c48a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
@@ -25,4 +25,8 @@ import org.apache.spark.api.java.JavaPairRDD;
 public interface SparkTran<KI extends WritableComparable, VI, KO extends 
WritableComparable, VO> {
   JavaPairRDD<KO, VO> transform(
       JavaPairRDD<KI, VI> input);
+
+  public String getName();
+
+  public Boolean isCacheEnable();
 }

Reply via email to