Repository: incubator-zeppelin Updated Branches: refs/heads/master b15b20136 -> 6a894b09f
[ZEPPELIN-97][ZEPPELIN-134] pyspark issue with mllib api There were issue [ZEPPELIN-97](https://issues.apache.org/jira/browse/ZEPPELIN-97) with pyspark 1.4. The reason is, from pyspark 1.4, java gateway is created with `auto_convert = True` option. This PR fixes the problem. This PR also handles [ZEPPELIN-134](https://issues.apache.org/jira/browse/ZEPPELIN-134), inject sqlContext. And it finally improves to print more verbose stacktrace message, for example from ``` (<type 'exceptions.AttributeError'>, AttributeError("'list' object has no attribute '_get_object_id'",), <traceback object at 0x392b638>) ``` to ``` Traceback (most recent call last): File "/var/folders/zt/nd4j13y14jjg7_5pc4xgy7t80000gn/T//zeppelin_pyspark.py", line 110, in <module> eval(compiledCode) File "<string>", line 3, in <module> File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/dataframe.py", line 1200, in withColumn return self.select('*', col.alias(colName)) File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/dataframe.py", line 738, in select jdf = self._jdf.select(self._jcols(*cols)) File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/dataframe.py", line 630, in _jcols return self._jseq(cols, _to_java_column) File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/dataframe.py", line 617, in _jseq return _to_seq(self.sql_ctx._sc, cols, converter) File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/column.py", line 60, in _to_seq return sc._jvm.PythonUtils.toSeq(cols) File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 529, in __call__ [get_command_part(arg, self.pool) for arg in new_args]) File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 265, in get_command_part command_part = REFERENCE_TYPE + parameter._get_object_id() AttributeError: 'list' object has no attribute '_get_object_id' ``` Author: Lee moon soo <[email protected]> Closes #129 from Leemoonsoo/ZEPPELIN-97 and squashes the following commits: 1fa4bf6 [Lee moon soo] apply auto_convert for spark 1.4 bce3c1d [Lee moon soo] Print more stacktrace Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/6a894b09 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/6a894b09 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/6a894b09 Branch: refs/heads/master Commit: 6a894b09fbc599286df4db49993056b77b6bb6f6 Parents: b15b201 Author: Lee moon soo <[email protected]> Authored: Mon Jun 29 14:07:37 2015 -0700 Committer: Lee moon soo <[email protected]> Committed: Tue Jun 30 10:40:48 2015 -0700 ---------------------------------------------------------------------- .../zeppelin/spark/PySparkInterpreter.java | 1 + .../main/resources/python/zeppelin_pyspark.py | 16 +++++++++++----- .../zeppelin/rest/ZeppelinSparkClusterTest.java | 19 +++++++++++++++++++ 3 files changed, 31 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/6a894b09/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 95eefd8..092b077 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -137,6 +137,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand CommandLine cmd = CommandLine.parse(getProperty("zeppelin.pyspark.python")); cmd.addArgument(scriptPath, false); cmd.addArgument(Integer.toString(port), false); + cmd.addArgument(getJavaSparkContext().version(), false); executor = new DefaultExecutor(); outputStream = new ByteArrayOutputStream(); PipedOutputStream ps = new PipedOutputStream(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/6a894b09/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index e29544e..802015d 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -32,7 +32,12 @@ from pyspark.serializers import MarshalSerializer, PickleSerializer from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row client = GatewayClient(port=int(sys.argv[1])) -gateway = JavaGateway(client) +sparkVersion = sys.argv[2] + +if sparkVersion.startswith("1.4"): + gateway = JavaGateway(client, auto_convert = True) +else: + gateway = JavaGateway(client) java_import(gateway.jvm, "org.apache.spark.SparkEnv") java_import(gateway.jvm, "org.apache.spark.SparkConf") @@ -45,15 +50,15 @@ intp.onPythonScriptInitialized() jsc = intp.getJavaSparkContext() -if jsc.version().startswith("1.2"): +if sparkVersion.startswith("1.2"): java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext") -elif jsc.version().startswith("1.3"): +elif sparkVersion.startswith("1.3"): java_import(gateway.jvm, "org.apache.spark.sql.*") java_import(gateway.jvm, "org.apache.spark.sql.hive.*") -elif jsc.version().startswith("1.4"): +elif sparkVersion.startswith("1.4"): java_import(gateway.jvm, "org.apache.spark.sql.*") java_import(gateway.jvm, "org.apache.spark.sql.hive.*") @@ -64,6 +69,7 @@ jconf = intp.getSparkConf() conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf) sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf) sqlc = SQLContext(sc, intp.getSQLContext()) +sqlContext = sqlc z = intp.getZeppelinContext() @@ -117,6 +123,6 @@ while True : excInnerError = excInnerError[innerErrorStart:] intp.setStatementsFinished(excInnerError + str(sys.exc_info()), True) except: - intp.setStatementsFinished(str(sys.exc_info()), True) + intp.setStatementsFinished(traceback.format_exc(), True) output.reset() http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/6a894b09/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 758a1e4..fd4a8b3 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -88,6 +88,25 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { } ZeppelinServer.notebook.removeNote(note.id()); } + + @Test + public void pySparkAutoConvertOptionTest() throws IOException { + // create new note + Note note = ZeppelinServer.notebook.createNote(); + + int sparkVersion = getSparkVersionNumber(note); + + if (isPyspark() && sparkVersion >= 14) { // auto_convert enabled from spark 1.4 + // run markdown paragraph, again + Paragraph p = note.addParagraph(); + p.setText("%pyspark\nfrom pyspark.sql.functions import *\n" + + "print(sqlContext.range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())"); + note.run(p.getId()); + waitForFinish(p); + assertEquals("10\n", p.getResult().message()); + } + ZeppelinServer.notebook.removeNote(note.id()); + } @Test public void zRunTest() throws IOException {
