HIVE-8858: Visualize generated Spark plan [Spark Branch] (Chinna via Jimmy)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eb48ffdb Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eb48ffdb Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eb48ffdb Branch: refs/heads/hbase-metastore Commit: eb48ffdbfc0690480e2aeb53f03e249ad6091378 Parents: 6f00507 Author: Jimmy Xiang <jxi...@cloudera.com> Authored: Wed Apr 22 14:51:35 2015 -0700 Committer: Xuefu Zhang <xzh...@cloudera.com> Committed: Mon Jun 1 14:02:18 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/spark/MapInput.java | 10 ++++ .../hadoop/hive/ql/exec/spark/MapTran.java | 10 ++++ .../hadoop/hive/ql/exec/spark/ReduceTran.java | 11 ++++ .../hadoop/hive/ql/exec/spark/ShuffleTran.java | 10 ++++ .../hadoop/hive/ql/exec/spark/SparkPlan.java | 58 ++++++++++++++++++++ .../hadoop/hive/ql/exec/spark/SparkTran.java | 4 ++ 6 files changed, 103 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/eb48ffdb/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java index 8d18885..157e4d8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java @@ -85,4 +85,14 @@ public class MapInput implements SparkTran<WritableComparable, Writable, } } + + @Override + public String getName() { + return "MapInput"; + } + + @Override + public Boolean isCacheEnable() { + return new Boolean(toCache); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/eb48ffdb/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java index 638c387..f6a4d77 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java @@ -35,4 +35,14 @@ public class MapTran implements SparkTran<BytesWritable, BytesWritable, HiveKey, this.mapFunc = mapFunc; } + @Override + public String getName() { + return "MapTran"; + } + + @Override + public Boolean isCacheEnable() { + // TODO Auto-generated method stub + return null; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/eb48ffdb/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java index dbc614b..fd6b31c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java @@ -34,4 +34,15 @@ public class ReduceTran implements SparkTran<HiveKey, Iterable<BytesWritable>, H public void setReduceFunction(HiveReduceFunction redFunc) { this.reduceFunc = redFunc; } + + @Override + public String getName() { + return "Reduce"; + } + + @Override + public Boolean isCacheEnable() { + // TODO Auto-generated method stub + return null; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/eb48ffdb/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java index 4a597ee..6cdab20 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java @@ -49,4 +49,14 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, I } return result; } + + @Override + public String getName() { + return "Shuffle"; + } + + @Override + public Boolean isCacheEnable() { + return new Boolean(toCache); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/eb48ffdb/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java index b45494d..81b7e85 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.io.BytesWritable; @@ -36,6 +38,7 @@ import com.google.common.base.Preconditions; @SuppressWarnings("rawtypes") public class SparkPlan { private static final String CLASS_NAME = SparkPlan.class.getName(); + private static final Log LOG = LogFactory.getLog(SparkPlan.class); private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private final Set<SparkTran> rootTrans = new HashSet<SparkTran>(); @@ -72,6 +75,8 @@ public class SparkPlan { tranToOutputRDDMap.put(tran, rdd); } + logSparkPlan(); + JavaPairRDD<HiveKey, BytesWritable> finalRDD = null; for (SparkTran leafTran : leafTrans) { JavaPairRDD<HiveKey, BytesWritable> rdd = tranToOutputRDDMap.get(leafTran); @@ -86,6 +91,59 @@ public class SparkPlan { return finalRDD; } + private void logSparkPlan() { + LOG.info("------------------------------ Spark Plan -----------------------------"); + Set<SparkTran> keySet = invertedTransGraph.keySet(); + for (SparkTran sparkTran : keySet) { + if (sparkTran instanceof ReduceTran) { + String sparkPlan = " " + sparkTran.getName(); + sparkPlan = getSparkPlan(sparkTran, sparkPlan); + LOG.info(sparkPlan); + } + } + LOG.info("------------------------------ Spark Plan -----------------------------"); + } + + private String getSparkPlan(SparkTran leaf, String sparkPlanMsg) { + if (leaf != null) { + List<SparkTran> parents = getParents(leaf); + if (parents.size() > 0) { + sparkPlanMsg = sparkPlanMsg + " <-- "; + boolean isFirst = true; + SparkTran parent = null; + for (SparkTran sparkTran : parents) { + if (isFirst) { + sparkPlanMsg = sparkPlanMsg + "( " + sparkTran.getName(); + sparkPlanMsg = logCacheStatus(sparkPlanMsg, sparkTran); + isFirst = false; + } else { + sparkPlanMsg = sparkPlanMsg + "," + sparkTran.getName(); + sparkPlanMsg = logCacheStatus(sparkPlanMsg, sparkTran); + } + if (getParents(sparkTran).size() > 0 && !(sparkTran instanceof ReduceTran)) { + parent = sparkTran; + } + } + sparkPlanMsg = sparkPlanMsg + " ) "; + return getSparkPlan(parent, sparkPlanMsg); + } else { + return sparkPlanMsg; + } + } + return sparkPlanMsg; + } + + private String logCacheStatus(String sparkPlanMsg, SparkTran sparkTran) { + if (sparkTran.isCacheEnable() != null) { + if (sparkTran.isCacheEnable().booleanValue()) { + sparkPlanMsg = sparkPlanMsg + " (cache on) "; + } else { + sparkPlanMsg = sparkPlanMsg + " (cache off) "; + } + } + return sparkPlanMsg; + } + public void addTran(SparkTran tran) { rootTrans.add(tran); leafTrans.add(tran); http://git-wip-us.apache.org/repos/asf/hive/blob/eb48ffdb/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java index 4daa61e..c3c48a0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java @@ -25,4 +25,8 @@ import org.apache.spark.api.java.JavaPairRDD; public interface SparkTran<KI extends WritableComparable, VI, KO extends WritableComparable, VO> { JavaPairRDD<KO, VO> transform( JavaPairRDD<KI, VI> input); + + public String getName(); + + public Boolean isCacheEnable(); }