Hi, Yes, I can always reproduce the issue:
> about you workload, Spark configuration, JDK version and OS version? I ran SparkPI 1000 java -version java version "1.7.0_67" Java(TM) SE Runtime Environment (build 1.7.0_67-b01) Java HotSpot(TM) 64-Bit Server VM (build 24.65-b04, mixed mode) cat /etc/centos-release CentOS release 6.5 (Final) My Spark’s hive-site.xml with following: <property> <name>hive.exec.compress.output</name> <value>true</value> </property> <property> <name>mapred.output.compression.codec</name> <value>org.apache.hadoop.io.compress.SnappyCodec</value> </property> <property> <name>mapred.output.compression.type</name> <value>BLOCK</value> </property> e.g. MASTER=spark://m1:7077,m2:7077 ./bin/run-example SparkPi 1000 2014-10-22 20:23:17,033 ERROR [sparkDriver-akka.actor.default-dispatcher-18] actor.ActorSystemImpl (Slf4jLogger.scala:apply$mcV$sp(66)) - Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-2] shutting down ActorSystem [sparkDriver] java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316) at org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:79) at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83) at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:68) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:829) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2014-10-22 20:23:17,036 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Failed to run reduce at SparkPi.scala:35 Exception in thread "main" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:240) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2014-10-22 20:23:17,038 INFO [sparkDriver-akka.actor.default-dispatcher-14] remote.RemoteActorRefProvider$RemotingTerminator (Slf4jLogger.scala:apply$mcV$sp(74)) - Shutting down remote daemon. 2014-10-22 20:23:17,039 INFO [sparkDriver-akka.actor.default-dispatcher-14] remote.RemoteActorRefProvider$RemotingTerminator (Slf4jLogger.scala:apply$mcV$sp(74)) - Remote daemon shut down; proceeding with flushing remote transports. Regards Arthur On 17 Oct, 2014, at 9:33 am, Shao, Saisai <saisai.s...@intel.com> wrote: > Hi Arthur, > > I think this is a known issue in Spark, you can check > (https://issues.apache.org/jira/browse/SPARK-3958). I’m curious about it, can > you always reproduce this issue, Is this issue related to some specific data > sets, would you mind giving me some information about you workload, Spark > configuration, JDK version and OS version? > > Thanks > Jerry > > From: arthur.hk.c...@gmail.com [mailto:arthur.hk.c...@gmail.com] > Sent: Friday, October 17, 2014 7:13 AM > To: user > Cc: arthur.hk.c...@gmail.com > Subject: Spark Hive Snappy Error > > Hi, > > When trying Spark with Hive table, I got the “java.lang.UnsatisfiedLinkError: > org.xerial.snappy.SnappyNative.maxCompressedLength(I)I” error, > > > val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) > sqlContext.sql(“select count(1) from q8_national_market_share > sqlContext.sql("select count(1) from > q8_national_market_share").collect().foreach(println) > java.lang.UnsatisfiedLinkError: > org.xerial.snappy.SnappyNative.maxCompressedLength(I)I > at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) > at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316) > at > org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:79) > at > org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125) > at > org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207) > at > org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83) > at > org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:68) > at > org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) > at > org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) > at > org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) > at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) > at > org.apache.spark.sql.hive.HadoopTableReader.<init>(TableReader.scala:68) > at > org.apache.spark.sql.hive.execution.HiveTableScan.<init>(HiveTableScan.scala:68) > at > org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$14.apply(HiveStrategies.scala:188) > at > org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$14.apply(HiveStrategies.scala:188) > at > org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:364) > at > org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$.apply(HiveStrategies.scala:184) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) > at > org.apache.spark.sql.execution.SparkStrategies$HashAggregation$.apply(SparkStrategies.scala:146) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) > at > org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402) > at > org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400) > at > org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406) > at > org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406) > at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) > at $iwC$$iwC$$iwC$$iwC.<init>(<console>:15) > at $iwC$$iwC$$iwC.<init>(<console>:20) > at $iwC$$iwC.<init>(<console>:22) > at $iwC.<init>(<console>:24) > at <init>(<console>:26) > at .<init>(<console>:30) > at .<clinit>(<console>) > at .<init>(<console>:7) > at .<clinit>(<console>) > at $print(<console>) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) > at > org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) > at > org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) > at > org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) > at > org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) > at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) > at > org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) > at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) > at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) > at > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) > at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) > at org.apache.spark.repl.Main$.main(Main.scala:31) > at org.apache.spark.repl.Main.main(Main.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > > > I have already set the following in$PARK_HOME/conf/hive-site.xml > <property> > <name>hive.exec.compress.output</name> > <value>true</value> > </property> > > <property> > <name>mapred.output.compression.codec</name> > <value>org.apache.hadoop.io.compress.SnappyCodec</value> > </property> > > <property> > <name>mapred.output.compression.type</name> > <value>BLOCK</value> > </property> > > > My questions: > Q1) Does it mean that I need to copy snappy files to Spark or Hive? > Q2) or Do I need to recompile Spark (maven) with extra parameter like > "-Drequire.snappy=true-Pnative”? > or how to fix this? > > > Regards > Arthur