pyspark 1.3 support
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/4fdd0f2c Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/4fdd0f2c Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/4fdd0f2c Branch: refs/heads/master Commit: 4fdd0f2cd0be16285ef07e449561db8fbf416969 Parents: 1c2dfb8 Author: Lee moon soo <[email protected]> Authored: Mon Mar 16 17:19:21 2015 +0900 Committer: Lee moon soo <[email protected]> Committed: Mon Mar 16 17:19:21 2015 +0900 ---------------------------------------------------------------------- .../zeppelin/spark/PySparkInterpreter.java | 3 ++- .../main/resources/python/zeppelin_pyspark.py | 22 +++++++++++++------- 2 files changed, 17 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fdd0f2c/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java index 5f5386e..071fcea 100644 --- a/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java @@ -242,7 +242,8 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } SparkInterpreter sparkInterpreter = getSparkInterpreter(); - if (!sparkInterpreter.getSparkContext().version().startsWith("1.2")) { + if (!sparkInterpreter.getSparkContext().version().startsWith("1.2") && + !sparkInterpreter.getSparkContext().version().startsWith("1.3")) { return new InterpreterResult(Code.ERROR, "pyspark " + sparkInterpreter.getSparkContext().version() + " is not supported"); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fdd0f2c/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index eecaf1f..b822c0d 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -15,21 +15,29 @@ from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row client = GatewayClient(port=int(sys.argv[1])) gateway = JavaGateway(client) + java_import(gateway.jvm, "org.apache.spark.SparkEnv") java_import(gateway.jvm, "org.apache.spark.SparkConf") java_import(gateway.jvm, "org.apache.spark.api.java.*") java_import(gateway.jvm, "org.apache.spark.api.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") -java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") -java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") -java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext") -java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext") -java_import(gateway.jvm, "scala.Tuple2") - intp = gateway.entry_point - jsc = intp.getJavaSparkContext() + +if jsc.version().startswith("1.2"): + java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") + java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") + java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext") + java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext") +elif jsc.version().startswith("1.3"): + java_import(gateway.jvm, "org.apache.spark.sql.*") + java_import(gateway.jvm, "org.apache.spark.sql.hive.*") + + +java_import(gateway.jvm, "scala.Tuple2") + + jconf = intp.getSparkConf() conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf) sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
