Re: spark streaming - saving kafka DStream into hadoop throws exception
(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-saving-kafka-DStream-into-hadoop-throws-exception-tp12202.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming from Kafka
Hi, Just wondering if you've seen the following error when reading from Kafka: ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: scala/reflect/ClassManifest at kafka.utils.Log4jController$.init(Log4jController.scala:29) at kafka.utils.Log4jController$.clinit(Log4jController.scala) at kafka.utils.Logging$class.$init$(Logging.scala:29) at kafka.utils.VerifiableProperties.init(VerifiableProperties.scala:24) at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:78) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 18 more Thanks, Harold
Re: Spark Streaming from Kafka
Looks like the kafka jar that you are using isn't compatible with your scala version. Thanks Best Regards On Wed, Oct 29, 2014 at 11:50 AM, Harold Nguyen har...@nexgate.com wrote: Hi, Just wondering if you've seen the following error when reading from Kafka: ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: scala/reflect/ClassManifest at kafka.utils.Log4jController$.init(Log4jController.scala:29) at kafka.utils.Log4jController$.clinit(Log4jController.scala) at kafka.utils.Logging$class.$init$(Logging.scala:29) at kafka.utils.VerifiableProperties.init(VerifiableProperties.scala:24) at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:78) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 18 more Thanks, Harold
Re: Spark Streaming from Kafka
Thanks! How do I find out which Kafka jar to use for scala 2.10.4? — Sent from Mailbox On Wed, Oct 29, 2014 at 12:26 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Looks like the kafka jar that you are using isn't compatible with your scala version. Thanks Best Regards On Wed, Oct 29, 2014 at 11:50 AM, Harold Nguyen har...@nexgate.com wrote: Hi, Just wondering if you've seen the following error when reading from Kafka: ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: scala/reflect/ClassManifest at kafka.utils.Log4jController$.init(Log4jController.scala:29) at kafka.utils.Log4jController$.clinit(Log4jController.scala) at kafka.utils.Logging$class.$init$(Logging.scala:29) at kafka.utils.VerifiableProperties.init(VerifiableProperties.scala:24) at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:78) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 18 more Thanks, Harold
Re: Spark Streaming from Kafka
I using kafka_2.10-1.1.0.jar on spark 1.1.0 — Sent from Mailbox On Wed, Oct 29, 2014 at 12:31 AM, null har...@nexgate.com wrote: Thanks! How do I find out which Kafka jar to use for scala 2.10.4? — Sent from Mailbox On Wed, Oct 29, 2014 at 12:26 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Looks like the kafka jar that you are using isn't compatible with your scala version. Thanks Best Regards On Wed, Oct 29, 2014 at 11:50 AM, Harold Nguyen har...@nexgate.com wrote: Hi, Just wondering if you've seen the following error when reading from Kafka: ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: scala/reflect/ClassManifest at kafka.utils.Log4jController$.init(Log4jController.scala:29) at kafka.utils.Log4jController$.clinit(Log4jController.scala) at kafka.utils.Logging$class.$init$(Logging.scala:29) at kafka.utils.VerifiableProperties.init(VerifiableProperties.scala:24) at kafka.consumer.ConsumerConfig.init(ConsumerConfig.scala:78) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 18 more Thanks, Harold
Re: Spark Streaming with Kafka, building project with 'sbt assembly' is extremely slow
I came across this: https://github.com/xerial/sbt-pack Until i found this, I was simply using the sbt-assembly plugin (sbt clean assembly) mn On Sep 4, 2014, at 2:46 PM, Aris arisofala...@gmail.com wrote: Thanks for answering Daniil - I have SBT version 0.13.5, is that an old version? Seems pretty up-to-date. It turns out I figured out a way around this entire problem: just use 'sbt package', and when using bin/spark-submit, pass it the --jars option and GIVE IT ALL THE JARS from the local iv2 cache. Pretty inelegant, but at least I am able to develop, and when I want to make a super JAR with sbt assembly I can use the stupidly slow method. Here is the important snippet for grabbing all the JARs for the local cache of ivy2 : --jars $(find ~/.ivy2/cache/ -iname *.jar | tr '\n' ,) Here's the entire running command - bin/spark-submit --master local[*] --jars $(find /home/data/.ivy2/cache/ -iname *.jar | tr '\n' ,) --class KafkaStreamConsumer ~/code_host/data/scala/streamingKafka/target/scala-2.10/streamingkafka_2.10-1.0.jar node1:2181 my-consumer-group aris-topic 1 This is fairly bad, but it works around sbt assembly being incredibly slow On Tue, Sep 2, 2014 at 2:13 PM, Daniil Osipov daniil.osi...@shazam.com wrote: What version of sbt are you using? There is a bug in early version of 0.13 that causes assembly to be extremely slow - make sure you're using the latest one. On Fri, Aug 29, 2014 at 1:30 PM, Aris wrote: Hi folks, I am trying to use Kafka with Spark Streaming, and it appears I cannot do the normal 'sbt package' as I do with other Spark applications, such as Spark alone or Spark with MLlib. I learned I have to build with the sbt-assembly plugin. OK, so here is my build.sbt file for my extremely simple test Kafka/Spark Streaming project. It Takes almost 30 minutes to build! This is a Centos Linux machine on SSDs with 4GB of RAM, it's never been slow for me. To compare, sbt assembly for the entire Spark project itself takes less than 10 minutes. At the bottom of this file I am trying to play with 'cacheOutput' options, because I read online that maybe I am calculating SHA-1 for all the *.class files in this super JAR. I also copied the mergeStrategy from Spark contributor TD Spark Streaming tutorial from Spark Summit 2014. Again, is there some better way to build this JAR file, just using sbt package? This is process is working, but very slow. Any help with speeding up this compilation is really appreciated!! Aris - import AssemblyKeys._ // put this at the top of the file name := streamingKafka version := 1.0 scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.0.1 % provided, org.apache.spark %% spark-streaming % 1.0.1 % provided, org.apache.spark %% spark-streaming-kafka % 1.0.1 ) assemblySettings jarName in assembly := streamingkafka-assembly.jar mergeStrategy in assembly := { case m if m.toLowerCase.endsWith(manifest.mf) = MergeStrategy.discard case m if m.toLowerCase.matches(meta-inf.*\\.sf$) = MergeStrategy.discard case log4j.properties = MergeStrategy.discard case m if m.toLowerCase.startsWith(meta-inf/services/) = MergeStrategy.filterDistinctLines case reference.conf= MergeStrategy.concat case _ = MergeStrategy.first } assemblyOption in assembly ~= { _.copy(cacheOutput = false) }
Re: Spark Streaming with Kafka, building project with 'sbt assembly' is extremely slow
Thanks for answering Daniil - I have SBT version 0.13.5, is that an old version? Seems pretty up-to-date. It turns out I figured out a way around this entire problem: just use 'sbt package', and when using bin/spark-submit, pass it the --jars option and GIVE IT ALL THE JARS from the local iv2 cache. Pretty inelegant, but at least I am able to develop, and when I want to make a super JAR with sbt assembly I can use the stupidly slow method. Here is the important snippet for grabbing all the JARs for the local cache of ivy2 : --jars $(find ~/.ivy2/cache/ -iname *.jar | tr '\n' ,) Here's the entire running command - bin/spark-submit --master local[*] --jars $(find /home/data/.ivy2/cache/ -iname *.jar | tr '\n' ,) --class KafkaStreamConsumer ~/code_host/data/scala/streamingKafka/target/scala-2.10/streamingkafka_2.10-1.0.jar node1:2181 my-consumer-group aris-topic 1 This is fairly bad, but it works around sbt assembly being incredibly slow On Tue, Sep 2, 2014 at 2:13 PM, Daniil Osipov daniil.osi...@shazam.com wrote: What version of sbt are you using? There is a bug in early version of 0.13 that causes assembly to be extremely slow - make sure you're using the latest one. On Fri, Aug 29, 2014 at 1:30 PM, Aris wrote: Hi folks, I am trying to use Kafka with Spark Streaming, and it appears I cannot do the normal 'sbt package' as I do with other Spark applications, such as Spark alone or Spark with MLlib. I learned I have to build with the sbt-assembly plugin. OK, so here is my build.sbt file for my extremely simple test Kafka/Spark Streaming project. It Takes almost 30 minutes to build! This is a Centos Linux machine on SSDs with 4GB of RAM, it's never been slow for me. To compare, sbt assembly for the entire Spark project itself takes less than 10 minutes. At the bottom of this file I am trying to play with 'cacheOutput' options, because I read online that maybe I am calculating SHA-1 for all the *.class files in this super JAR. I also copied the mergeStrategy from Spark contributor TD Spark Streaming tutorial from Spark Summit 2014. Again, is there some better way to build this JAR file, just using sbt package? This is process is working, but very slow. Any help with speeding up this compilation is really appreciated!! Aris - import AssemblyKeys._ // put this at the top of the file name := streamingKafka version := 1.0 scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.0.1 % provided, org.apache.spark %% spark-streaming % 1.0.1 % provided, org.apache.spark %% spark-streaming-kafka % 1.0.1 ) assemblySettings jarName in assembly := streamingkafka-assembly.jar mergeStrategy in assembly := { case m if m.toLowerCase.endsWith(manifest.mf) = MergeStrategy.discard case m if m.toLowerCase.matches(meta-inf.*\\.sf$) = MergeStrategy.discard case log4j.properties = MergeStrategy.discard case m if m.toLowerCase.startsWith(meta-inf/services/) = MergeStrategy.filterDistinctLines case reference.conf= MergeStrategy.concat case _ = MergeStrategy.first } assemblyOption in assembly ~= { _.copy(cacheOutput = false) }
Spark Streaming with Kafka, building project with 'sbt assembly' is extremely slow
Hi folks, I am trying to use Kafka with Spark Streaming, and it appears I cannot do the normal 'sbt package' as I do with other Spark applications, such as Spark alone or Spark with MLlib. I learned I have to build with the sbt-assembly plugin. OK, so here is my build.sbt file for my extremely simple test Kafka/Spark Streaming project. It Takes almost 30 minutes to build! This is a Centos Linux machine on SSDs with 4GB of RAM, it's never been slow for me. To compare, sbt assembly for the entire Spark project itself takes less than 10 minutes. At the bottom of this file I am trying to play with 'cacheOutput' options, because I read online that maybe I am calculating SHA-1 for all the *.class files in this super JAR. I also copied the mergeStrategy from Spark contributor TD Spark Streaming tutorial from Spark Summit 2014. Again, is there some better way to build this JAR file, just using sbt package? This is process is working, but very slow. Any help with speeding up this compilation is really appreciated!! Aris - import AssemblyKeys._ // put this at the top of the file name := streamingKafka version := 1.0 scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.0.1 % provided, org.apache.spark %% spark-streaming % 1.0.1 % provided, org.apache.spark %% spark-streaming-kafka % 1.0.1 ) assemblySettings jarName in assembly := streamingkafka-assembly.jar mergeStrategy in assembly := { case m if m.toLowerCase.endsWith(manifest.mf) = MergeStrategy.discard case m if m.toLowerCase.matches(meta-inf.*\\.sf$) = MergeStrategy.discard case log4j.properties = MergeStrategy.discard case m if m.toLowerCase.startsWith(meta-inf/services/) = MergeStrategy.filterDistinctLines case reference.conf= MergeStrategy.concat case _ = MergeStrategy.first } assemblyOption in assembly ~= { _.copy(cacheOutput = false) }
spark streaming - saving kafka DStream into hadoop throws exception
:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185) at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-saving-kafka-DStream-into-hadoop-throws-exception-tp12202.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark streaming - saving kafka DStream into hadoop throws exception
) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185) at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-saving-kafka-DStream-into-hadoop-throws-exception-tp12202.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark streaming - saving kafka DStream into hadoop throws exception
Look this is the whole program. I am not trying to serialize the JobConf. def main(args: Array[String]) { try { val properties = getProperties(settings.properties) StreamingExamples.setStreamingLogLevels() val zkQuorum = properties.get(zookeeper.list).toString() val topic = properties.get(topic.name).toString() val group = properties.get(group.name).toString() val threads = properties.get(consumer.threads).toString() val topicpMap = Map(topic - threads.toInt) val hdfsNameNodeUrl = properties.get(hdfs.namenode.url).toString() val hdfsCheckPointUrl = hdfsNameNodeUrl + properties.get(hdfs.checkpoint.path).toString() val hdfsDataUrl = hdfsNameNodeUrl + properties.get(hdfs.data.path).toString() val checkPointInterval = properties.get(spark.streaming.checkpoint.interval).toString().toInt val sparkConf = new SparkConf().setAppName(KafkaMessageReceiver) println(===) println(kafka configuration: zk: + zkQuorum + ; topic: + topic + ; group: + group + ; threads: + threads) println(===) val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(hdfsCheckPointUrl) val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap) dStream.checkpoint(Seconds(checkPointInterval)) dStream.saveAsNewAPIHadoopFiles(hdfsDataUrl, csv, classOf[String], classOf[String], classOf[TextOutputFormat[String,String]], ssc.sparkContext.hadoopConfiguration) val eventData = dStream.map(_._2).map(_.split(,)).map(data = DataObject(data(0), data(1), data(2), data(3), data(4), data(5), data(6), data(7), data(8).toLong, data(9), data(10), data(11), data(12).toLong, data(13), data(14))) val count = eventData.filter(_.state == COMPLETE).countByWindow(Minutes(15), Seconds(1)) count.map(cnt = the Total count of calls in complete state in the last 15 minutes is: + cnt).print() ssc.start() ssc.awaitTermination() } catch { case e: Exception = println(exception caught: + e); } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-saving-kafka-DStream-into-hadoop-throws-exception-tp12202p12207.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark streaming - saving kafka DStream into hadoop throws exception
if I reduce the app to the following code then I don't see the exception. It creates the hadoop files but they are empty! The DStream doesn't get written out to the files! def main(args: Array[String]) { try { val properties = getProperties(settings.properties) StreamingExamples.setStreamingLogLevels() val zkQuorum = properties.get(zookeeper.list).toString() val topic = properties.get(topic.name).toString() val group = properties.get(group.name).toString() val threads = properties.get(consumer.threads).toString() val topicpMap = Map(topic - threads.toInt) val hdfsNameNodeUrl = properties.get(hdfs.namenode.url).toString() val hdfsCheckPointUrl = hdfsNameNodeUrl + properties.get(hdfs.checkpoint.path).toString() val hdfsDataUrl = hdfsNameNodeUrl + properties.get(hdfs.data.path).toString() val checkPointInterval = properties.get(spark.streaming.checkpoint.interval).toString().toInt val sparkConf = new SparkConf().setAppName(KafkaMessageReceiver) println(===) println(kafka configuration: zk: + zkQuorum + ; topic: + topic + ; group: + group + ; threads: + threads) println(===) val ssc = new StreamingContext(sparkConf, Seconds(1)) val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap) dStream.saveAsNewAPIHadoopFiles(hdfsDataUrl, csv, classOf[String], classOf[String], classOf[TextOutputFormat[String,String]], ssc.sparkContext.hadoopConfiguration) ssc.start() ssc.awaitTermination() } catch { case e: Exception = println(exception caught: + e); } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-saving-kafka-DStream-into-hadoop-throws-exception-tp12202p12213.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using Spark Streaming with Kafka 0.7.2
Hi, For testing you could also just use the Kafka 0.7.2 console consumer and pipe it's output to netcat (nc) and process that as in the example https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala That worked for me. Backporting to the older Kafka version seems tricky due to all the protocol changes. Andre On 07/26/2014 12:56 AM, Tathagata Das wrote: Spark Streaming is built as part of the whole Spark repository. Hence follow Spark's building instructions http://spark.apache.org/docs/latest/building-with-maven.html to build Spark Streaming along with Spark. Spark Streaming 0.8.1 was built with kafka 0.7.2. You can take a look. If necessary, I recommend modifying the current Kafka Receiver based on the 0.8.1 Kafka Receiver https://github.com/apache/spark/blob/v0.8.1-incubating/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala TD On Fri, Jul 25, 2014 at 10:16 AM, maddenpj madde...@gmail.com wrote: Hi all, Currently we have Kafka 0.7.2 running in production and can't upgrade for external reasons however spark streaming (1.0.1) was built with Kafka 0.8.0. What is the best way to use spark streaming with older versions of Kafka. Currently I'm investigating trying to build spark streaming myself but I can't find any documentation specifically for building spark streaming. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-Streaming-with-Kafka-0-7-2-tp10674.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Using Spark Streaming with Kafka 0.7.2
Hi all, Currently we have Kafka 0.7.2 running in production and can't upgrade for external reasons however spark streaming (1.0.1) was built with Kafka 0.8.0. What is the best way to use spark streaming with older versions of Kafka. Currently I'm investigating trying to build spark streaming myself but I can't find any documentation specifically for building spark streaming. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-Streaming-with-Kafka-0-7-2-tp10674.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming with Kafka NoClassDefFoundError
In case you still have issues with duplicate files in uber jar, here is a reference sbt file with assembly plugin that deals with duplicates https://github.com/databricks/training/blob/sparkSummit2014/streaming/scala/build.sbt On Fri, Jul 11, 2014 at 10:06 AM, Bill Jay bill.jaypeter...@gmail.com wrote: You may try to use this one: https://github.com/sbt/sbt-assembly I had an issue of duplicate files in the uber jar file. But I think this library will assemble dependencies into a single jar file. Bill On Fri, Jul 11, 2014 at 1:34 AM, Dilip dilip_ram...@hotmail.com wrote: A simple sbt assembly is not working. Is there any other way to include particular jars with assembly command? Regards, Dilip On Friday 11 July 2014 12:45 PM, Bill Jay wrote: I have met similar issues. The reason is probably because in Spark assembly, spark-streaming-kafka is not included. Currently, I am using Maven to generate a shaded package with all the dependencies. You may try to use sbt assembly to include the dependencies in your jar file. Bill On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com wrote: Hi Akhil, Can you please guide me through this? Because the code I am running already has this in it: [java] SparkContext sc = new SparkContext(); sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar); Is there something I am missing? Thanks, Dilip On Friday 11 July 2014 12:02 PM, Akhil Das wrote: Easiest fix would be adding the kafka jars to the SparkContext while creating it. Thanks Best Regards On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote: Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
Re: Spark Streaming with Kafka NoClassDefFoundError
Alsom the reason the spark-streaming-kafka is not included in the spark assembly is that we do not want dependencies of external systems like kafka (which itself probably has a complex dependency tree) to cause conflict with the core spark's functionality and stability. TD On Sun, Jul 13, 2014 at 5:48 PM, Tathagata Das tathagata.das1...@gmail.com wrote: In case you still have issues with duplicate files in uber jar, here is a reference sbt file with assembly plugin that deals with duplicates https://github.com/databricks/training/blob/sparkSummit2014/streaming/scala/build.sbt On Fri, Jul 11, 2014 at 10:06 AM, Bill Jay bill.jaypeter...@gmail.com wrote: You may try to use this one: https://github.com/sbt/sbt-assembly I had an issue of duplicate files in the uber jar file. But I think this library will assemble dependencies into a single jar file. Bill On Fri, Jul 11, 2014 at 1:34 AM, Dilip dilip_ram...@hotmail.com wrote: A simple sbt assembly is not working. Is there any other way to include particular jars with assembly command? Regards, Dilip On Friday 11 July 2014 12:45 PM, Bill Jay wrote: I have met similar issues. The reason is probably because in Spark assembly, spark-streaming-kafka is not included. Currently, I am using Maven to generate a shaded package with all the dependencies. You may try to use sbt assembly to include the dependencies in your jar file. Bill On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com wrote: Hi Akhil, Can you please guide me through this? Because the code I am running already has this in it: [java] SparkContext sc = new SparkContext(); sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar); Is there something I am missing? Thanks, Dilip On Friday 11 July 2014 12:02 PM, Akhil Das wrote: Easiest fix would be adding the kafka jars to the SparkContext while creating it. Thanks Best Regards On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote: Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
Re: Spark Streaming with Kafka NoClassDefFoundError
Easiest fix would be adding the kafka jars to the SparkContext while creating it. Thanks Best Regards On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote: Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$. createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$. createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream( KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke( NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
Re: Spark Streaming with Kafka NoClassDefFoundError
Hi Akhil, Can you please guide me through this? Because the code I am running already has this in it: [java] SparkContext sc = new SparkContext(); sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar); Is there something I am missing? Thanks, Dilip On Friday 11 July 2014 12:02 PM, Akhil Das wrote: Easiest fix would be adding the kafka jars to the SparkContext while creating it. Thanks Best Regards On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com mailto:dilip_ram...@hotmail.com wrote: Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
Re: Spark Streaming with Kafka NoClassDefFoundError
I have met similar issues. The reason is probably because in Spark assembly, spark-streaming-kafka is not included. Currently, I am using Maven to generate a shaded package with all the dependencies. You may try to use sbt assembly to include the dependencies in your jar file. Bill On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com wrote: Hi Akhil, Can you please guide me through this? Because the code I am running already has this in it: [java] SparkContext sc = new SparkContext(); sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar); Is there something I am missing? Thanks, Dilip On Friday 11 July 2014 12:02 PM, Akhil Das wrote: Easiest fix would be adding the kafka jars to the SparkContext while creating it. Thanks Best Regards On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote: Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
Re: Spark Streaming with Kafka NoClassDefFoundError
A simple sbt assembly is not working. Is there any other way to include particular jars with assembly command? Regards, Dilip On Friday 11 July 2014 12:45 PM, Bill Jay wrote: I have met similar issues. The reason is probably because in Spark assembly, spark-streaming-kafka is not included. Currently, I am using Maven to generate a shaded package with all the dependencies. You may try to use sbt assembly to include the dependencies in your jar file. Bill On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com mailto:dilip_ram...@hotmail.com wrote: Hi Akhil, Can you please guide me through this? Because the code I am running already has this in it: [java] SparkContext sc = new SparkContext(); sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar); Is there something I am missing? Thanks, Dilip On Friday 11 July 2014 12:02 PM, Akhil Das wrote: Easiest fix would be adding the kafka jars to the SparkContext while creating it. Thanks Best Regards On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com mailto:dilip_ram...@hotmail.com wrote: Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
Re: Spark Streaming with Kafka NoClassDefFoundError
You may try to use this one: https://github.com/sbt/sbt-assembly I had an issue of duplicate files in the uber jar file. But I think this library will assemble dependencies into a single jar file. Bill On Fri, Jul 11, 2014 at 1:34 AM, Dilip dilip_ram...@hotmail.com wrote: A simple sbt assembly is not working. Is there any other way to include particular jars with assembly command? Regards, Dilip On Friday 11 July 2014 12:45 PM, Bill Jay wrote: I have met similar issues. The reason is probably because in Spark assembly, spark-streaming-kafka is not included. Currently, I am using Maven to generate a shaded package with all the dependencies. You may try to use sbt assembly to include the dependencies in your jar file. Bill On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com wrote: Hi Akhil, Can you please guide me through this? Because the code I am running already has this in it: [java] SparkContext sc = new SparkContext(); sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar); Is there something I am missing? Thanks, Dilip On Friday 11 July 2014 12:02 PM, Akhil Das wrote: Easiest fix would be adding the kafka jars to the SparkContext while creating it. Thanks Best Regards On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote: Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
Spark Streaming with Kafka NoClassDefFoundError
Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
Re: How to shut down Spark Streaming with Kafka properly?
I closed that PR for other reasons. This change is still proposed by itself: https://issues.apache.org/jira/browse/SPARK-2034 https://github.com/apache/spark/pull/980 On Fri, Jun 6, 2014 at 1:35 AM, Tobias Pfeiffer t...@preferred.jp wrote: Sean, your patch fixes the issue, thank you so much! (This is the second time within one week I run into network libraries not shutting down threads properly, I'm really glad your code fixes the issue.) I saw your pull request is closed, but not merged yet. Can I do anything to get your fix into Spark? Open an issue, send a pull request myself etc.? Thanks Tobias
How to shut down Spark Streaming with Kafka properly?
Hi, I am trying to use Spark Streaming with Kafka, which works like a charm -- except for shutdown. When I run my program with sbt run-main, sbt will never exit, because there are two non-daemon threads left that don't die. I created a minimal example at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-kafkadoesntshutdown-scala. It starts a StreamingContext and does nothing more than connecting to a Kafka server and printing what it receives. Using the `future { ... }` construct, I shut down the StreamingContext after some seconds and then print the difference between the threads at start time and at end time. The output can be found at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output1. There are a number of threads remaining that will prevent sbt from exiting. When I replace `KafkaUtils.createStream(...)` with a call that does exactly the same, except that it calls `consumerConnector.shutdown()` in `KafkaReceiver.onStop()` (which it should, IMO), the output is as shown at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output2. Does anyone have *any* idea what is going on here and why the program doesn't shut down properly? The behavior is the same with both kafka 0.8.0 and 0.8.1.1, by the way. Thanks Tobias
Re: How to shut down Spark Streaming with Kafka properly?
Sean, your patch fixes the issue, thank you so much! (This is the second time within one week I run into network libraries not shutting down threads properly, I'm really glad your code fixes the issue.) I saw your pull request is closed, but not merged yet. Can I do anything to get your fix into Spark? Open an issue, send a pull request myself etc.? Thanks Tobias
Spark Streaming with Kafka | Check if DStream is Empty | HDFS Write
Hi All I am using Spark Streaming with Kafka, I recieve messages and after minor processing I write them to HDFS, as of now I am using saveAsTextFiles() / saveAsHadoopFiles() Java methods - Is there some default way of writing stream to Hadoop like we have HDFS sink concept in Flume? I mean is there some configurable way of writing at Spark Streaming after processing DStream. - How can I check if DStream is empty so that I can skip HDFS write if no message is present (I am pulling Kafka topic every 1 sec)? because sometime it writes empty file to HDFS due to unavailability of messages. Please suggest. TIA -- Anish Sneh Experience is the best teacher. +91-99718-55883 http://in.linkedin.com/in/anishsneh