HiveContext as defualt sqlContext, SQLContext as a fallback
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/c7f7f8f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/c7f7f8f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/c7f7f8f0 Branch: refs/heads/master Commit: c7f7f8f088b27163ce2962fc15bcb3440dc5dd45 Parents: f11afb6 Author: Lee moon soo <[email protected]> Authored: Sat Mar 14 23:36:48 2015 +0900 Committer: Lee moon soo <[email protected]> Committed: Sat Mar 14 23:36:48 2015 +0900 ---------------------------------------------------------------------- .../nflabs/zeppelin/spark/SparkInterpreter.java | 33 ++++++++++---------- .../zeppelin/spark/SparkSqlInterpreter.java | 13 +------- .../nflabs/zeppelin/spark/ZeppelinContext.java | 2 -- 3 files changed, 17 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c7f7f8f0/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java index d770c89..8473a24 100644 --- a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java @@ -4,6 +4,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.PrintStream; import java.io.PrintWriter; +import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URL; @@ -27,7 +28,6 @@ import org.apache.spark.scheduler.DAGScheduler; import org.apache.spark.scheduler.Pool; import org.apache.spark.scheduler.Stage; import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.ui.jobs.JobProgressListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,7 +98,6 @@ public class SparkInterpreter extends Interpreter { private SparkContext sc; private ByteArrayOutputStream out; private SQLContext sqlc; - private HiveContext hiveContext; private DependencyResolver dep; private SparkJLineCompletion completor; @@ -142,21 +141,24 @@ public class SparkInterpreter extends Interpreter { public SQLContext getSQLContext() { if (sqlc == null) { - // for spark 1.3x default HiveContext - if (getSparkContext().version().startsWith("1.3")) { - sqlc = getHiveContext(); - } else { + String name = "org.apache.spark.sql.hive.HiveContext"; + Constructor<?> hc; + try { + hc = getClass().getClassLoader().loadClass(name) + .getConstructor(SparkContext.class); + sqlc = (SQLContext) hc.newInstance(getSparkContext()); + } catch (NoSuchMethodException | SecurityException + | ClassNotFoundException | InstantiationException + | IllegalAccessException | IllegalArgumentException + | InvocationTargetException e) { + + // when hive dependency is not loaded, it'll fail. + // in this case SQLContext can be used. sqlc = new SQLContext(getSparkContext()); } } - return sqlc; - } - public HiveContext getHiveContext() { - if (hiveContext == null) { - hiveContext = new HiveContext(getSparkContext()); - } - return hiveContext; + return sqlc; } public DependencyResolver getDependencyResolver() { @@ -371,7 +373,7 @@ public class SparkInterpreter extends Interpreter { dep = getDependencyResolver(); - z = new ZeppelinContext(sc, sqlc, getHiveContext(), null, dep, printStream); + z = new ZeppelinContext(sc, sqlc, null, dep, printStream); try { if (sc.version().startsWith("1.1") || sc.version().startsWith("1.2")) { @@ -392,7 +394,6 @@ public class SparkInterpreter extends Interpreter { binder = (Map<String, Object>) getValue("_binder"); binder.put("sc", sc); binder.put("sqlc", sqlc); - binder.put("hiveContext", getHiveContext()); binder.put("z", z); binder.put("out", printStream); @@ -404,8 +405,6 @@ public class SparkInterpreter extends Interpreter { + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); intp.interpret("@transient val sqlContext = " + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); - intp.interpret("@transient val hiveContext = " - + "_binder.get(\"hiveContext\").asInstanceOf[org.apache.spark.sql.hive.HiveContext]"); intp.interpret("import org.apache.spark.SparkContext._"); if (sc.version().startsWith("1.1")) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c7f7f8f0/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java index 53f4037..f7ed73e 100644 --- a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java @@ -14,7 +14,6 @@ import org.apache.spark.scheduler.Stage; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SQLContext.QueryExecution; import org.apache.spark.sql.catalyst.expressions.Attribute; -//import org.apache.spark.sql.catalyst.expressions.Row; import org.apache.spark.ui.jobs.JobProgressListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +53,6 @@ public class SparkSqlInterpreter extends Interpreter { SparkSqlInterpreter.class.getName(), new InterpreterPropertyBuilder() .add("zeppelin.spark.maxResult", "10000", "Max number of SparkSQL result to display.") - .add("zeppelin.spark.useHiveContext", "false", - "Use HiveContext instead of SQLContext if it is true.") .add("zeppelin.spark.concurrentSQL", "false", "Execute multiple SQL concurrently if set true.") .build()); @@ -92,10 +89,6 @@ public class SparkSqlInterpreter extends Interpreter { return null; } - private boolean useHiveContext() { - return Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext")); - } - public boolean concurrentSQL() { return Boolean.parseBoolean(getProperty("zeppelin.spark.concurrentSQL")); } @@ -108,11 +101,7 @@ public class SparkSqlInterpreter extends Interpreter { public InterpreterResult interpret(String st, InterpreterContext context) { SQLContext sqlc = null; - if (useHiveContext()) { - sqlc = getSparkInterpreter().getHiveContext(); - } else { - sqlc = getSparkInterpreter().getSQLContext(); - } + sqlc = getSparkInterpreter().getSQLContext(); SparkContext sc = sqlc.sparkContext(); if (concurrentSQL()) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c7f7f8f0/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java index f85734c..aa6b048 100644 --- a/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java @@ -32,12 +32,10 @@ public class ZeppelinContext { private InterpreterContext interpreterContext; public ZeppelinContext(SparkContext sc, SQLContext sql, - HiveContext hiveContext, InterpreterContext interpreterContext, DependencyResolver dep, PrintStream printStream) { this.sc = sc; this.sqlContext = sql; - this.hiveContext = hiveContext; this.interpreterContext = interpreterContext; this.dep = dep; this.out = printStream;
