Make code compatible with spark-1.3
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/f30e08f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/f30e08f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/f30e08f8 Branch: refs/heads/master Commit: f30e08f8432dc6dbd772e6ea70edf0b099874503 Parents: 3058d6d Author: Lee moon soo <[email protected]> Authored: Sat Mar 14 03:41:06 2015 +0900 Committer: Lee moon soo <[email protected]> Committed: Sat Mar 14 03:41:06 2015 +0900 ---------------------------------------------------------------------- .../nflabs/zeppelin/spark/SparkInterpreter.java | 58 ++++++++++++++++--- .../zeppelin/spark/SparkSqlInterpreter.java | 60 ++++++++++++++------ .../nflabs/zeppelin/spark/ZeppelinContext.java | 3 +- 3 files changed, 95 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f30e08f8/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java index 4ba5cd2..47d5be0 100644 --- a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java @@ -14,6 +14,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import org.apache.spark.HttpServer; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.SparkEnv; @@ -49,6 +50,7 @@ import scala.tools.nsc.settings.MutableSettings.PathSetting; import com.nflabs.zeppelin.interpreter.Interpreter; import com.nflabs.zeppelin.interpreter.InterpreterContext; +import com.nflabs.zeppelin.interpreter.InterpreterException; import com.nflabs.zeppelin.interpreter.InterpreterGroup; import com.nflabs.zeppelin.interpreter.InterpreterPropertyBuilder; import com.nflabs.zeppelin.interpreter.InterpreterResult; @@ -181,12 +183,34 @@ public class SparkInterpreter extends Interpreter { String execUri = System.getenv("SPARK_EXECUTOR_URI"); String[] jars = SparkILoop.getAddedJars(); + + String classServerUri = null; + + try { // in case of spark 1.1x, spark 1.2x + Method classServer = interpreter.intp().getClass().getMethod("classServer"); + HttpServer httpServer = (HttpServer) classServer.invoke(interpreter.intp()); + classServerUri = httpServer.uri(); + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + // continue + } + + if (classServerUri == null) { + try { // for spark 1.3x + Method classServer = interpreter.intp().getClass().getMethod("classServerUri"); + classServerUri = (String) classServer.invoke(interpreter.intp()); + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + throw new InterpreterException(e); + } + } + SparkConf conf = new SparkConf() .setMaster(getProperty("master")) .setAppName(getProperty("spark.app.name")) .setJars(jars) - .set("spark.repl.class.uri", interpreter.intp().classServer().uri()); + .set("spark.repl.class.uri", classServerUri); if (execUri != null) { conf.set("spark.executor.uri", execUri); @@ -344,7 +368,20 @@ public class SparkInterpreter extends Interpreter { z = new ZeppelinContext(sc, sqlc, getHiveContext(), null, dep, printStream); - this.interpreter.loadFiles(settings); + try { + if (sc.version().startsWith("1.1") || sc.version().startsWith("1.2")) { + Method loadFiles = this.interpreter.getClass().getMethod("loadFiles", Settings.class); + loadFiles.invoke(this.interpreter, settings); + } else if (sc.version().startsWith("1.3")) { + Method loadFiles = this.interpreter.getClass().getMethod( + "org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class); + loadFiles.invoke(this.interpreter, settings); + } + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + throw new InterpreterException(e); + } + intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); binder = (Map<String, Object>) getValue("_binder"); @@ -363,7 +400,16 @@ public class SparkInterpreter extends Interpreter { intp.interpret("@transient val hiveContext = " + "_binder.get(\"hiveContext\").asInstanceOf[org.apache.spark.sql.hive.HiveContext]"); intp.interpret("import org.apache.spark.SparkContext._"); - intp.interpret("import sqlc._"); + + if (sc.version().startsWith("1.1")) { + intp.interpret("import sqlc._"); + } else if (sc.version().startsWith("1.2")) { + intp.interpret("import sqlc._"); + } else if (sc.version().startsWith("1.3")) { + intp.interpret("import sqlc.implicits._"); + intp.interpret("import sqlc.sql"); + intp.interpret("import org.apache.spark.sql.functions._"); + } // add jar if (depInterpreter != null) { @@ -421,10 +467,6 @@ public class SparkInterpreter extends Interpreter { return scala.collection.JavaConversions.asJavaList(ret.candidates()); } - public void bindValue(String name, Object o) { - getResultCode(intp.bindValue(name, o)); - } - public Object getValue(String name) { Object ret = intp.valueOfTerm(name); if (ret instanceof None) { @@ -539,6 +581,8 @@ public class SparkInterpreter extends Interpreter { progressInfo = getProgressFromStage_1_1x(sparkListener, job.finalStage()); } else if (sc.version().startsWith("1.2")) { progressInfo = getProgressFromStage_1_1x(sparkListener, job.finalStage()); + } else if (sc.version().startsWith("1.3")) { + progressInfo = getProgressFromStage_1_1x(sparkListener, job.finalStage()); } else { continue; } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f30e08f8/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java index 3cdce03..6df4eec 100644 --- a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java @@ -12,9 +12,9 @@ import org.apache.spark.scheduler.ActiveJob; import org.apache.spark.scheduler.DAGScheduler; import org.apache.spark.scheduler.Stage; import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.SchemaRDD; +import org.apache.spark.sql.SQLContext.QueryExecution; import org.apache.spark.sql.catalyst.expressions.Attribute; -import org.apache.spark.sql.catalyst.expressions.Row; +//import org.apache.spark.sql.catalyst.expressions.Row; import org.apache.spark.ui.jobs.JobProgressListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -122,11 +122,15 @@ public class SparkSqlInterpreter extends Interpreter { } sc.setJobGroup(getJobGroup(context), "Zeppelin", false); - SchemaRDD rdd; - Row[] rows = null; + + // SchemaRDD - spark 1.1, 1.2, DataFrame - spark 1.3 + Object rdd; + Object[] rows = null; try { rdd = sqlc.sql(st); - rows = rdd.take(maxResult + 1); + + Method take = rdd.getClass().getMethod("take", Integer.class); + rows = (Object[]) take.invoke(rdd, maxResult + 1); } catch (Exception e) { logger.error("Error", e); sc.clearJobGroup(); @@ -134,10 +138,22 @@ public class SparkSqlInterpreter extends Interpreter { } String msg = null; + // get field names + Method queryExecution; + QueryExecution qe; + try { + queryExecution = rdd.getClass().getMethod("queryExecution"); + qe = (QueryExecution) queryExecution.invoke(rdd); + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + throw new InterpreterException(e); + } + List<Attribute> columns = scala.collection.JavaConverters.asJavaListConverter( - rdd.queryExecution().analyzed().output()).asJava(); + qe.analyzed().output()).asJava(); + for (Attribute col : columns) { if (msg == null) { msg = col.name(); @@ -145,26 +161,34 @@ public class SparkSqlInterpreter extends Interpreter { msg += "\t" + col.name(); } } + msg += "\n"; // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType, // FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType, // NullType, NumericType, ShortType, StringType, StructType - for (int r = 0; r < maxResult && r < rows.length; r++) { - Row row = rows[r]; - - for (int i = 0; i < columns.size(); i++) { - if (!row.isNullAt(i)) { - msg += row.apply(i).toString(); - } else { - msg += "null"; - } - if (i != columns.size() - 1) { - msg += "\t"; + try { + for (int r = 0; r < maxResult && r < rows.length; r++) { + Object row = rows[r]; + Method isNullAt = row.getClass().getMethod("isNullAt", Integer.class); + Method apply = row.getClass().getMethod("apply", Integer.class); + + for (int i = 0; i < columns.size(); i++) { + if (!(Boolean) isNullAt.invoke(row, i)) { + msg += apply.invoke(row, i).toString(); + } else { + msg += "null"; + } + if (i != columns.size() - 1) { + msg += "\t"; + } } + msg += "\n"; } - msg += "\n"; + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + throw new InterpreterException(e); } if (rows.length > maxResult) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f30e08f8/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java index e475de6..f85734c 100644 --- a/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java @@ -10,7 +10,6 @@ import java.util.Iterator; import org.apache.spark.SparkContext; import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.SchemaRDD; import org.apache.spark.sql.hive.HiveContext; import scala.Tuple2; @@ -49,9 +48,11 @@ public class ZeppelinContext { public HiveContext hiveContext; private GUI gui; + /* spark-1.3 public SchemaRDD sql(String sql) { return sqlContext.sql(sql); } + */ /** * Load dependency for interpreter and runtime (driver).
