That looks like a classpath problem. You should not have to include the kafka_2.10 artifact in your pom, spark-streaming-kafka_2.10 already has a transitive dependency on it. That being said, 0.8.2.1 is the correct version, so that's a little strange.
How are you building and submitting your application? Finally, if this ends up being a CDH related issue, you may have better luck on their forum. On Thu, Jun 23, 2016 at 1:16 PM, Sunita Arvind <sunitarv...@gmail.com> wrote: > Also, just to keep it simple, I am trying to use 1.6.0CDH5.7.0 in the > pom.xml as the cluster I am trying to run on is CDH5.7.0 with spark 1.6.0. > > Here is my pom setting: > > > <cdh.spark.version>1.6.0-cdh5.7.0</cdh.spark.version> > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-core_2.10</artifactId> > <version>${cdh.spark.version}</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-streaming_2.10</artifactId> > <version>${cdh.spark.version}</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-sql_2.10</artifactId> > <version>${cdh.spark.version}</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-streaming-kafka_2.10</artifactId> > <version>${cdh.spark.version}</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka_2.10</artifactId> > <version>0.8.2.1</version> > <scope>compile</scope> > </dependency> > > But trying to execute the application throws errors like below: > Exception in thread "main" java.lang.NoClassDefFoundError: > kafka/cluster/BrokerEndPoint > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90) > at scala.Option.map(Option.scala:145) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85) > at scala.util.Either$RightProjection.flatMap(Either.scala:523) > at > org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85) > at > org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179) > at > org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161) > at > org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:150) > at > org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215) > at > org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211) > at scala.util.Either$RightProjection.flatMap(Either.scala:523) > at > org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211) > at > org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) > at > com.edgecast.engine.ConcurrentOps$.createDataStreamFromKafka(ConcurrentOps.scala:68) > at > com.edgecast.engine.ConcurrentOps$.startProcessing(ConcurrentOps.scala:32) > at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:33) > at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > Caused by: java.lang.ClassNotFoundException: kafka.cluster.BrokerEndPoint > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > ... 38 more > 16/06/23 11:09:53 INFO SparkContext: Invoking stop() from shutdown hook > > > I've tried kafka version 0.8.2.0, 0.8.2.2, 0.9.0.0. With 0.9.0.0 the > processing hangs much sooner. > Can someone help with this error? > > regards > Sunita > > On Wed, Jun 22, 2016 at 8:20 PM, Sunita Arvind <sunitarv...@gmail.com> > wrote: >> >> Hello Experts, >> >> I am getting this error repeatedly: >> >> 16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the >> context, marking it as stopped >> java.lang.NullPointerException >> at >> com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202) >> at >> com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228) >> at >> com.typesafe.config.ConfigException.writeObject(ConfigException.java:58) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> at >> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) >> at java.lang.Throwable.writeObject(Throwable.java:985) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at >> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576) >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) >> at >> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141) >> at >> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141) >> at >> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141) >> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251) >> at >> org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142) >> at >> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554) >> at >> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) >> at >> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) >> at >> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73) >> at >> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67) >> at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) >> >> >> It seems to be a typical issue. All I am doing here is as below: >> >> Object ProcessingEngine{ >> >> def initializeSpark(customer:String):StreamingContext={ >> LogHandler.log.info("InitialeSpark") >> val custConf = ConfigFactory.load(customer + >> ".conf").getConfig(customer).withFallback(AppConf) >> implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer) >> val ssc: StreamingContext = new StreamingContext(sparkConf, >> Seconds(custConf.getLong("batchDurSec"))) >> ssc.checkpoint(custConf.getString("checkpointDir")) >> ssc >> } >> >> def createDataStreamFromKafka(customer:String, ssc: >> StreamingContext):DStream[Array[Byte]]={ >> val custConf = ConfigFactory.load(customer + >> ".conf").getConfig(customer).withFallback(ConfigFactory.load()) >> LogHandler.log.info("createDataStreamFromKafka") >> KafkaUtils.createDirectStream[String, >> Array[Byte], >> StringDecoder, >> DefaultDecoder]( >> ssc, >> Map[String, String]("metadata.broker.list" -> >> custConf.getString("brokers"), "group.id" -> custConf.getString("groupId")), >> Set(custConf.getString("topics"))) >> >> } >> >> def main(args: Array[String]): Unit = { >> val AppConf = ConfigFactory.load() >> LogHandler.log.info("Starting the processing Engine") >> getListOfCustomers().foreach{cust => >> implicit val ssc = initializeSpark(cust) >> val FullStream = createDataStreamFromKafka(cust,ssc) >> ConcurrentOps.determineAllSinks(cust, FullStream) >> FullStream.checkpoint(Duration(AppConf.getLong("batchDurSec") * 2000)) >> ssc.start() >> ssc.awaitTermination() >> } >> >> } >> } >> >> I also tried putting all the initialization directly in main (not using >> method calls for initializeSpark and createDataStreamFromKafka) and also not >> putting in foreach and creating a single spark and streaming context. >> However, the error persists. >> >> Appreciate any help. >> >> regards >> Sunita > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org