Thanks a lot, I will try to reproduce this in my local settings and dig into 
the details, thanks for your information.


BR
Jerry

From: arthur.hk.c...@gmail.com [mailto:arthur.hk.c...@gmail.com]
Sent: Wednesday, October 22, 2014 8:35 PM
To: Shao, Saisai
Cc: arthur.hk.c...@gmail.com; user
Subject: Re: Spark Hive Snappy Error

Hi,

Yes, I can always reproduce the issue:

about you workload, Spark configuration, JDK version and OS version?

I ran SparkPI 1000

java -version
java version "1.7.0_67"
Java(TM) SE Runtime Environment (build 1.7.0_67-b01)
Java HotSpot(TM) 64-Bit Server VM (build 24.65-b04, mixed mode)

cat /etc/centos-release
CentOS release 6.5 (Final)

My Spark’s hive-site.xml with following:
 <property>
  <name>hive.exec.compress.output</name>
  <value>true</value>
 </property>

 <property>
  <name>mapred.output.compression.codec</name>
  <value>org.apache.hadoop.io.compress.SnappyCodec</value>
 </property>

 <property>
  <name>mapred.output.compression.type</name>
  <value>BLOCK</value>
 </property>

e.g.
MASTER=spark://m1:7077,m2:7077 ./bin/run-example SparkPi 1000
2014-10-22 20:23:17,033 ERROR [sparkDriver-akka.actor.default-dispatcher-18] 
actor.ActorSystemImpl (Slf4jLogger.scala:apply$mcV$sp(66)) - Uncaught fatal 
error from thread [sparkDriver-akka.actor.default-dispatcher-2] shutting down 
ActorSystem [sparkDriver]
java.lang.UnsatisfiedLinkError: 
org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
         at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
         at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
         at 
org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:79)
         at 
org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125)
         at 
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207)
         at 
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83)
         at 
org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:68)
         at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36)
         at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
         at 
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
         at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809)
         at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:829)
         at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769)
         at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753)
         at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360)
         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
         at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
         at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
         at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
         at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2014-10-22 20:23:17,036 INFO  [main] scheduler.DAGScheduler 
(Logging.scala:logInfo(59)) - Failed to run reduce at SparkPi.scala:35
Exception in thread "main" org.apache.spark.SparkException: Job cancelled 
because SparkContext was shut down
         at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694)
         at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693)
         at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
         at 
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693)
         at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399)
         at 
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
         at 
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
         at akka.actor.ActorCell.terminate(ActorCell.scala:338)
         at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
         at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
         at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:240)
         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
         at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
         at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
         at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
         at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2014-10-22 20:23:17,038 INFO  [sparkDriver-akka.actor.default-dispatcher-14] 
remote.RemoteActorRefProvider$RemotingTerminator 
(Slf4jLogger.scala:apply$mcV$sp(74)) - Shutting down remote daemon.
2014-10-22 20:23:17,039 INFO  [sparkDriver-akka.actor.default-dispatcher-14] 
remote.RemoteActorRefProvider$RemotingTerminator 
(Slf4jLogger.scala:apply$mcV$sp(74)) - Remote daemon shut down; proceeding with 
flushing remote transports.


Regards
Arthur

On 17 Oct, 2014, at 9:33 am, Shao, Saisai 
<saisai.s...@intel.com<mailto:saisai.s...@intel.com>> wrote:


Hi Arthur,

I think this is a known issue in Spark, you can check 
(https://issues.apache.org/jira/browse/SPARK-3958). I’m curious about it, can 
you always reproduce this issue, Is this issue related to some specific data 
sets, would you mind giving me some information about you workload, Spark 
configuration, JDK version and OS version?

Thanks
Jerry

From: arthur.hk.c...@gmail.com<mailto:arthur.hk.c...@gmail.com> 
[mailto:arthur.hk.c...@gmail.com]
Sent: Friday, October 17, 2014 7:13 AM
To: user
Cc: arthur.hk.c...@gmail.com<mailto:arthur.hk.c...@gmail.com>
Subject: Spark Hive Snappy Error

Hi,

When trying Spark with Hive table, I got the “java.lang.UnsatisfiedLinkError: 
org.xerial.snappy.SnappyNative.maxCompressedLength(I)I” error,


val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql(“select count(1) from q8_national_market_share
sqlContext.sql("select count(1) from 
q8_national_market_share").collect().foreach(println)
java.lang.UnsatisfiedLinkError: 
org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
         at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
         at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
         at 
org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:79)
         at 
org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125)
         at 
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207)
         at 
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83)
         at 
org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:68)
         at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36)
         at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
         at 
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
         at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809)
         at 
org.apache.spark.sql.hive.HadoopTableReader.<init>(TableReader.scala:68)
         at 
org.apache.spark.sql.hive.execution.HiveTableScan.<init>(HiveTableScan.scala:68)
         at 
org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$14.apply(HiveStrategies.scala:188)
         at 
org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$14.apply(HiveStrategies.scala:188)
         at 
org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:364)
         at 
org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$.apply(HiveStrategies.scala:184)
         at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
         at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
         at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
         at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
         at 
org.apache.spark.sql.execution.SparkStrategies$HashAggregation$.apply(SparkStrategies.scala:146)
         at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
         at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
         at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
         at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
         at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)
         at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)
         at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)
         at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
         at $iwC$$iwC$$iwC$$iwC.<init>(<console>:15)
         at $iwC$$iwC$$iwC.<init>(<console>:20)
         at $iwC$$iwC.<init>(<console>:22)
         at $iwC.<init>(<console>:24)
         at <init>(<console>:26)
         at .<init>(<console>:30)
         at .<clinit>(<console>)
         at .<init>(<console>:7)
         at .<clinit>(<console>)
         at $print(<console>)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
         at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:606)
         at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
         at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
         at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
         at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
         at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
         at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
         at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
         at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
         at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
         at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
         at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
         at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
         at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
         at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
         at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
         at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
         at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
         at org.apache.spark.repl.Main$.main(Main.scala:31)
         at org.apache.spark.repl.Main.main(Main.scala)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
         at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:606)
         at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



I have already set the following in$PARK_HOME/conf/hive-site.xml
 <property>
  <name>hive.exec.compress.output</name>
  <value>true</value>
 </property>

 <property>
  <name>mapred.output.compression.codec</name>
  <value>org.apache.hadoop.io.compress.SnappyCodec</value>
 </property>

 <property>
  <name>mapred.output.compression.type</name>
  <value>BLOCK</value>
 </property>


My questions:
Q1) Does it mean that I need to copy snappy files to Spark or Hive?
Q2) or Do I need to recompile Spark (maven) with extra parameter like 
"-Drequire.snappy=true​-Pnative”?
or how to fix this?


Regards
Arthur

Reply via email to