Hi Jeff Yes, the code work with PySpark Shell and the Spark Submit in the same server where is running the Zeppelin and Livy. And I did an another test, I executed the same code with cUrl using to Livy and work ok.
Mauro Schneider On Fri, Sep 29, 2017 at 8:26 PM, Jeff Zhang <zjf...@gmail.com> wrote: > It is more likely your spark configuration issue, could you run this code > in pyspark shell ? > > > > Mauro Schneider <maur...@gmail.com>于2017年9月29日周五 下午11:24写道: > >> >> Hi >> >> I'm trying execute PySpark code with Zeppelin and Livy but without >> success. With Scala and Livy work well but when I execute the code below I >> getting a Exception from Zeppelin. >> >> <code> >> %livy.pyspark >> txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt") >> counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda word: >> (word, 1)).reduceByKey(lambda a, b: a + b) >> counts.collect() >> </code> >> >> <excpetion> >> Version:0.9 StartHTML:0000000168 EndHTML:0000024858 >> StartFragment:0000000204 EndFragment:0000024822 SourceURL: >> http://dtbhad02p.bvs.corp:8080/#/notebook/2CSK9TFZM >> An error occurred while calling o47.textFile. >> : java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative. >> maxCompressedLength(I)I >> at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) >> at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316) >> at org.xerial.snappy.SnappyOutputStream.<init>( >> SnappyOutputStream.java:79) >> at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream( >> CompressionCodec.scala:156) >> at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4. >> apply(TorrentBroadcast.scala:200) >> at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4. >> apply(TorrentBroadcast.scala:200) >> at scala.Option.map(Option.scala:145) >> at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject( >> TorrentBroadcast.scala:200) >> at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks( >> TorrentBroadcast.scala:102) >> at org.apache.spark.broadcast.TorrentBroadcast.<init>( >> TorrentBroadcast.scala:85) >> at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast( >> TorrentBroadcastFactory.scala:34) >> at org.apache.spark.broadcast.BroadcastManager.newBroadcast( >> BroadcastManager.scala:63) >> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1334) >> at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply( >> SparkContext.scala:1022) >> at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply( >> SparkContext.scala:1019) >> at org.apache.spark.rdd.RDDOperationScope$.withScope( >> RDDOperationScope.scala:150) >> at org.apache.spark.rdd.RDDOperationScope$.withScope( >> RDDOperationScope.scala:111) >> at org.apache.spark.SparkContext.withScope(SparkContext.scala:722) >> at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1019) >> at org.apache.spark.SparkContext$$anonfun$textFile$1.apply( >> SparkContext.scala:840) >> at org.apache.spark.SparkContext$$anonfun$textFile$1.apply( >> SparkContext.scala:838) >> at org.apache.spark.rdd.RDDOperationScope$.withScope( >> RDDOperationScope.scala:150) >> at org.apache.spark.rdd.RDDOperationScope$.withScope( >> RDDOperationScope.scala:111) >> at org.apache.spark.SparkContext.withScope(SparkContext.scala:722) >> at org.apache.spark.SparkContext.textFile(SparkContext.scala:838) >> at org.apache.spark.api.java.JavaSparkContext.textFile( >> JavaSparkContext.scala:188) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at sun.reflect.NativeMethodAccessorImpl.invoke( >> NativeMethodAccessorImpl.java:57) >> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >> DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) >> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) >> at py4j.Gateway.invoke(Gateway.java:259) >> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) >> at py4j.commands.CallCommand.execute(CallCommand.java:79) >> at py4j.GatewayConnection.run(GatewayConnection.java:209) >> at java.lang.Thread.run(Thread.java:745) >> </exception> >> >> I had too tested the code below with Curl and Livy and work correctly >> >> <code /user/mulisses/test.py> >> import sys >> from pyspark import SparkContext >> >> if __name__ == "__main__": >> sc = SparkContext(appName="Hello Spark") >> txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt") >> counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda >> word: (word, 1)).reduceByKey(lambda a, b: a + b) >> counts.saveAsTextFile("test_wc_py") >> </code> >> >> <cUrl> >> curl -i --negotiate -u : -X POST --data '{"file": >> "/user/mulisses/test.py"}' -H "Content-Type: application/json" >> dtbhad02p.bvs.corp:8998/batches >> </cUrl> >> >> Is anyone know how to solve it ? I had tested with Zeppelin 0.7.2 and >> Zeppelin 0.7.3 >> Am I forgetting some configuration? >> >> Best regards, >> >> Mauro Schneider >> >> >>