Also, just to keep it simple, I am trying to use 1.6.0CDH5.7.0 in the
pom.xml as the cluster I am trying to run on is CDH5.7.0 with spark 1.6.0.

Here is my pom setting:


<cdh.spark.version>1.6.0-cdh5.7.0</cdh.spark.version>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>${cdh.spark.version}</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>${cdh.spark.version}</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>${cdh.spark.version}</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>${cdh.spark.version}</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.1</version>
    <scope>compile</scope>
</dependency>

But trying to execute the application throws errors like below:
Exception in thread "main" java.lang.NoClassDefFoundError:
kafka/cluster/BrokerEndPoint
    at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
    at scala.Option.map(Option.scala:145)
    at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
    at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
    at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
    at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
    at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
    at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
    at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
    at scala.util.Either$RightProjection.flatMap(Either.scala:523)
    at
org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
    at
org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
    at
org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
    at
org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:150)
    at
org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215)
    at
org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
    at scala.util.Either$RightProjection.flatMap(Either.scala:523)
    at
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
    at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
    at
com.edgecast.engine.ConcurrentOps$.createDataStreamFromKafka(ConcurrentOps.scala:68)
    at
com.edgecast.engine.ConcurrentOps$.startProcessing(ConcurrentOps.scala:32)
    at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:33)
    at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.ClassNotFoundException: kafka.cluster.BrokerEndPoint
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 38 more
16/06/23 11:09:53 INFO SparkContext: Invoking stop() from shutdown hook


I've tried kafka version 0.8.2.0, 0.8.2.2, 0.9.0.0. With 0.9.0.0 the
processing hangs much sooner.
Can someone help with this error?

regards
Sunita

On Wed, Jun 22, 2016 at 8:20 PM, Sunita Arvind <sunitarv...@gmail.com>
wrote:

> Hello Experts,
>
> I am getting this error repeatedly:
>
> 16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the 
> context, marking it as stopped
> java.lang.NullPointerException
>       at 
> com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
>       at 
> com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
>       at 
> com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
>       at java.lang.Throwable.writeObject(Throwable.java:985)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
>       at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141)
>       at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
>       at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251)
>       at 
> org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142)
>       at 
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554)
>       at 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
>       at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
>       at 
> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73)
>       at 
> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67)
>       at scala.collection.immutable.List.foreach(List.scala:318)
>       at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67)
>       at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
>
> It seems to be a typical issue. All I am doing here is as below:
>
> Object ProcessingEngine{
>
> def initializeSpark(customer:String):StreamingContext={
>   LogHandler.log.info("InitialeSpark")
>   val custConf = ConfigFactory.load(customer + 
> ".conf").getConfig(customer).withFallback(AppConf)
>   implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer)
>   val ssc: StreamingContext = new StreamingContext(sparkConf, 
> Seconds(custConf.getLong("batchDurSec")))
>   ssc.checkpoint(custConf.getString("checkpointDir"))
>   ssc
> }
>
> def createDataStreamFromKafka(customer:String, ssc: 
> StreamingContext):DStream[Array[Byte]]={
>   val custConf = ConfigFactory.load(customer + 
> ".conf").getConfig(customer).withFallback(ConfigFactory.load())
>   LogHandler.log.info("createDataStreamFromKafka")
>   KafkaUtils.createDirectStream[String,
>     Array[Byte],
>     StringDecoder,
>     DefaultDecoder](
>     ssc,
>     Map[String, String]("metadata.broker.list" -> 
> custConf.getString("brokers"), "group.id" -> custConf.getString("groupId")),
>     Set(custConf.getString("topics")))
>
> }
>
> def main(args: Array[String]): Unit = {
>   val AppConf = ConfigFactory.load()
>   LogHandler.log.info("Starting the processing Engine")
>   getListOfCustomers().foreach{cust =>
>     implicit val ssc = initializeSpark(cust)
>     val FullStream = createDataStreamFromKafka(cust,ssc)
>     ConcurrentOps.determineAllSinks(cust, FullStream)
>     FullStream.checkpoint(Duration(AppConf.getLong("batchDurSec") * 2000))
>     ssc.start()
>     ssc.awaitTermination()
>   }
>
> }
> }
>
> I also tried putting all the initialization directly in main (not using
> method calls for initializeSpark and createDataStreamFromKafka) and also
> not putting in foreach and creating a single spark and streaming context.
> However, the error persists.
>
> Appreciate any help.
>
> regards
> Sunita
>

Reply via email to