I see your test with livy via curl command, but seems you are submitting it
as batch. Could you do it via interactive livy session, this is what livy
interpreter of zeppelin does.


Mauro Schneider <maur...@gmail.com>于2017年10月1日周日 上午4:55写道:

> Hi Jeff
>
> Yes, the code work with PySpark Shell and the Spark Submit in the same
> server where is running the Zeppelin and Livy. And I did an another test, I
> executed the same code with cUrl using to Livy and work ok.
>
>
>
>
>
> Mauro Schneider
>
>
> On Fri, Sep 29, 2017 at 8:26 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> It is more likely your spark configuration issue, could you run this code
>> in pyspark shell ?
>>
>>
>>
>> Mauro Schneider <maur...@gmail.com>于2017年9月29日周五 下午11:24写道:
>>
>>>
>>> Hi
>>>
>>> I'm trying execute PySpark code with Zeppelin and Livy but without
>>> success. With Scala and Livy work well but when I execute the code below I
>>> getting a Exception from Zeppelin.
>>>
>>> <code>
>>> %livy.pyspark
>>> txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
>>> counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda word:
>>> (word, 1)).reduceByKey(lambda a, b: a + b)
>>> counts.collect()
>>> </code>
>>>
>>> <excpetion>
>>> Version:0.9 StartHTML:0000000168 EndHTML:0000024858
>>> StartFragment:0000000204 EndFragment:0000024822 SourceURL:
>>> http://dtbhad02p.bvs.corp:8080/#/notebook/2CSK9TFZM
>>> An error occurred while calling o47.textFile.
>>> : java.lang.UnsatisfiedLinkError:
>>> org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
>>> at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
>>> at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
>>> at
>>> org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:79)
>>> at org.apache.spark.io
>>> .SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:156)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:200)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:200)
>>> at scala.Option.map(Option.scala:145)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:200)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>> at
>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
>>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1334)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1022)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1019)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
>>> at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1019)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:840)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:838)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
>>> at org.apache.spark.SparkContext.textFile(SparkContext.scala:838)
>>> at
>>> org.apache.spark.api.java.JavaSparkContext.textFile(JavaSparkContext.scala:188)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
>>> at py4j.Gateway.invoke(Gateway.java:259)
>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>> at py4j.GatewayConnection.run(GatewayConnection.java:209)
>>> at java.lang.Thread.run(Thread.java:745)
>>> </exception>
>>>
>>> I had too tested the code below with Curl and Livy and work correctly
>>>
>>> <code /user/mulisses/test.py>
>>> import sys
>>> from pyspark import SparkContext
>>>
>>> if __name__ == "__main__":
>>>         sc = SparkContext(appName="Hello Spark")
>>>         txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
>>>         counts = txtFile.flatMap(lambda line: line.split("
>>> ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
>>>         counts.saveAsTextFile("test_wc_py")
>>> </code>
>>>
>>> <cUrl>
>>> curl  -i --negotiate -u : -X POST --data '{"file":
>>> "/user/mulisses/test.py"}' -H "Content-Type: application/json"
>>> dtbhad02p.bvs.corp:8998/batches
>>> </cUrl>
>>>
>>> Is anyone know how to solve it ? I had tested with Zeppelin 0.7.2 and
>>> Zeppelin 0.7.3
>>> Am I forgetting some configuration?
>>>
>>> Best regards,
>>>
>>> Mauro Schneider
>>>
>>>
>>>
>

Reply via email to