That looks like a classpath problem.  You should not have to include
the kafka_2.10 artifact in your pom, spark-streaming-kafka_2.10
already has a transitive dependency on it.  That being said, 0.8.2.1
is the correct version, so that's a little strange.

How are you building and submitting your application?

Finally, if this ends up being a CDH related issue, you may have
better luck on their forum.

On Thu, Jun 23, 2016 at 1:16 PM, Sunita Arvind <sunitarv...@gmail.com> wrote:
> Also, just to keep it simple, I am trying to use 1.6.0CDH5.7.0 in the
> pom.xml as the cluster I am trying to run on is CDH5.7.0 with spark 1.6.0.
>
> Here is my pom setting:
>
>
> <cdh.spark.version>1.6.0-cdh5.7.0</cdh.spark.version>
> <dependency>
>     <groupId>org.apache.spark</groupId>
>     <artifactId>spark-core_2.10</artifactId>
>     <version>${cdh.spark.version}</version>
>     <scope>compile</scope>
> </dependency>
> <dependency>
>     <groupId>org.apache.spark</groupId>
>     <artifactId>spark-streaming_2.10</artifactId>
>     <version>${cdh.spark.version}</version>
>     <scope>compile</scope>
> </dependency>
> <dependency>
>     <groupId>org.apache.spark</groupId>
>     <artifactId>spark-sql_2.10</artifactId>
>     <version>${cdh.spark.version}</version>
>     <scope>compile</scope>
> </dependency>
> <dependency>
>     <groupId>org.apache.spark</groupId>
>     <artifactId>spark-streaming-kafka_2.10</artifactId>
>     <version>${cdh.spark.version}</version>
>     <scope>compile</scope>
> </dependency>
> <dependency>
>     <groupId>org.apache.kafka</groupId>
>     <artifactId>kafka_2.10</artifactId>
>     <version>0.8.2.1</version>
>     <scope>compile</scope>
> </dependency>
>
> But trying to execute the application throws errors like below:
> Exception in thread "main" java.lang.NoClassDefFoundError:
> kafka/cluster/BrokerEndPoint
>     at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
>     at scala.Option.map(Option.scala:145)
>     at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
>     at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
>     at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>     at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>     at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
>     at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
>     at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
>     at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>     at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
>     at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
>     at scala.util.Either$RightProjection.flatMap(Either.scala:523)
>     at
> org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
>     at
> org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
>     at
> org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
>     at
> org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:150)
>     at
> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215)
>     at
> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
>     at scala.util.Either$RightProjection.flatMap(Either.scala:523)
>     at
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
>     at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>     at
> com.edgecast.engine.ConcurrentOps$.createDataStreamFromKafka(ConcurrentOps.scala:68)
>     at
> com.edgecast.engine.ConcurrentOps$.startProcessing(ConcurrentOps.scala:32)
>     at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:33)
>     at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.lang.ClassNotFoundException: kafka.cluster.BrokerEndPoint
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>     ... 38 more
> 16/06/23 11:09:53 INFO SparkContext: Invoking stop() from shutdown hook
>
>
> I've tried kafka version 0.8.2.0, 0.8.2.2, 0.9.0.0. With 0.9.0.0 the
> processing hangs much sooner.
> Can someone help with this error?
>
> regards
> Sunita
>
> On Wed, Jun 22, 2016 at 8:20 PM, Sunita Arvind <sunitarv...@gmail.com>
> wrote:
>>
>> Hello Experts,
>>
>> I am getting this error repeatedly:
>>
>> 16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the
>> context, marking it as stopped
>> java.lang.NullPointerException
>>      at
>> com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
>>      at
>> com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
>>      at
>> com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
>>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>      at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>      at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>      at java.lang.reflect.Method.invoke(Method.java:606)
>>      at
>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>>      at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>>      at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>      at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>      at
>> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
>>      at java.lang.Throwable.writeObject(Throwable.java:985)
>>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>      at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>      at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>      at java.lang.reflect.Method.invoke(Method.java:606)
>>      at
>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>>      at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>>      at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>      at
>> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
>>      at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
>>      at
>> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141)
>>      at
>> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
>>      at
>> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
>>      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251)
>>      at 
>> org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142)
>>      at
>> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554)
>>      at
>> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
>>      at
>> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
>>      at
>> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73)
>>      at
>> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67)
>>      at scala.collection.immutable.List.foreach(List.scala:318)
>>      at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67)
>>      at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
>>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>      at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>      at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>      at java.lang.reflect.Method.invoke(Method.java:606)
>>      at
>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>>
>>
>> It seems to be a typical issue. All I am doing here is as below:
>>
>> Object ProcessingEngine{
>>
>> def initializeSpark(customer:String):StreamingContext={
>>   LogHandler.log.info("InitialeSpark")
>>   val custConf = ConfigFactory.load(customer +
>> ".conf").getConfig(customer).withFallback(AppConf)
>>   implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer)
>>   val ssc: StreamingContext = new StreamingContext(sparkConf,
>> Seconds(custConf.getLong("batchDurSec")))
>>   ssc.checkpoint(custConf.getString("checkpointDir"))
>>   ssc
>> }
>>
>> def createDataStreamFromKafka(customer:String, ssc:
>> StreamingContext):DStream[Array[Byte]]={
>>   val custConf = ConfigFactory.load(customer +
>> ".conf").getConfig(customer).withFallback(ConfigFactory.load())
>>   LogHandler.log.info("createDataStreamFromKafka")
>>   KafkaUtils.createDirectStream[String,
>>     Array[Byte],
>>     StringDecoder,
>>     DefaultDecoder](
>>     ssc,
>>     Map[String, String]("metadata.broker.list" ->
>> custConf.getString("brokers"), "group.id" -> custConf.getString("groupId")),
>>     Set(custConf.getString("topics")))
>>
>> }
>>
>> def main(args: Array[String]): Unit = {
>>   val AppConf = ConfigFactory.load()
>>   LogHandler.log.info("Starting the processing Engine")
>>   getListOfCustomers().foreach{cust =>
>>     implicit val ssc = initializeSpark(cust)
>>     val FullStream = createDataStreamFromKafka(cust,ssc)
>>     ConcurrentOps.determineAllSinks(cust, FullStream)
>>     FullStream.checkpoint(Duration(AppConf.getLong("batchDurSec") * 2000))
>>     ssc.start()
>>     ssc.awaitTermination()
>>   }
>>
>> }
>> }
>>
>> I also tried putting all the initialization directly in main (not using
>> method calls for initializeSpark and createDataStreamFromKafka) and also not
>> putting in foreach and creating a single spark and streaming context.
>> However, the error persists.
>>
>> Appreciate any help.
>>
>> regards
>> Sunita
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to