Re: NullPointerException when starting StreamingContext

2016-06-24 Thread Sunita Arvind
I was able to resolve the serialization issue. The root cause was, I was
accessing the config values within foreachRDD{}.
The solution was to extract the values from config outside the foreachRDD
scope and send in values to the loop directly. Probably something obvious
as we cannot have nested distribution data sets. Mentioning it here for
benefit of anyone else stumbling upon the same issue.

regards
Sunita

On Wed, Jun 22, 2016 at 8:20 PM, Sunita Arvind 
wrote:

> Hello Experts,
>
> I am getting this error repeatedly:
>
> 16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the 
> context, marking it as stopped
> java.lang.NullPointerException
>   at 
> com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
>   at 
> com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
>   at 
> com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>   at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
>   at java.lang.Throwable.writeObject(Throwable.java:985)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>   at 
> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
>   at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141)
>   at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
>   at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251)
>   at 
> org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142)
>   at 
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554)
>   at 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
>   at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
>   at 
> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73)
>   at 
> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67)
>   at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
>
> It seems to be a typical issue. All I am doing here is as below:
>
> Object ProcessingEngine{
>
> def initializeSpark(customer:String):StreamingContext={
>   LogHandler.log.info("InitialeSpark")
>   val custConf = ConfigFactory.load(customer + 
> ".conf").getConfig(customer).withFallback(AppConf)
>   implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer)
>   val ssc: StreamingContext = new StreamingContext(sparkConf, 
> Seconds(custConf.getLong("batchDurSec")))
>   ssc.checkpoint(custConf.getString("checkpointDir"))
>   ssc
> }
>
> def createDataStreamFromKafka(customer:String, ssc: 
> StreamingContext):DStream[Array[Byte]]={
>   val 

Re: NullPointerException when starting StreamingContext

2016-06-24 Thread Cody Koeninger
That looks like a classpath problem.  You should not have to include
the kafka_2.10 artifact in your pom, spark-streaming-kafka_2.10
already has a transitive dependency on it.  That being said, 0.8.2.1
is the correct version, so that's a little strange.

How are you building and submitting your application?

Finally, if this ends up being a CDH related issue, you may have
better luck on their forum.

On Thu, Jun 23, 2016 at 1:16 PM, Sunita Arvind  wrote:
> Also, just to keep it simple, I am trying to use 1.6.0CDH5.7.0 in the
> pom.xml as the cluster I am trying to run on is CDH5.7.0 with spark 1.6.0.
>
> Here is my pom setting:
>
>
> 1.6.0-cdh5.7.0
> 
> org.apache.spark
> spark-core_2.10
> ${cdh.spark.version}
> compile
> 
> 
> org.apache.spark
> spark-streaming_2.10
> ${cdh.spark.version}
> compile
> 
> 
> org.apache.spark
> spark-sql_2.10
> ${cdh.spark.version}
> compile
> 
> 
> org.apache.spark
> spark-streaming-kafka_2.10
> ${cdh.spark.version}
> compile
> 
> 
> org.apache.kafka
> kafka_2.10
> 0.8.2.1
> compile
> 
>
> But trying to execute the application throws errors like below:
> Exception in thread "main" java.lang.NoClassDefFoundError:
> kafka/cluster/BrokerEndPoint
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
> at scala.Option.map(Option.scala:145)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
> at scala.util.Either$RightProjection.flatMap(Either.scala:523)
> at
> org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
> at
> org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
> at
> org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
> at
> org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:150)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
> at scala.util.Either$RightProjection.flatMap(Either.scala:523)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
> at
> com.edgecast.engine.ConcurrentOps$.createDataStreamFromKafka(ConcurrentOps.scala:68)
> at
> com.edgecast.engine.ConcurrentOps$.startProcessing(ConcurrentOps.scala:32)
> at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:33)
> at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.lang.ClassNotFoundException: kafka.cluster.BrokerEndPoint
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
>   

Re: NullPointerException when starting StreamingContext

2016-06-23 Thread Sunita Arvind
Also, just to keep it simple, I am trying to use 1.6.0CDH5.7.0 in the
pom.xml as the cluster I am trying to run on is CDH5.7.0 with spark 1.6.0.

Here is my pom setting:


1.6.0-cdh5.7.0

org.apache.spark
spark-core_2.10
${cdh.spark.version}
compile


org.apache.spark
spark-streaming_2.10
${cdh.spark.version}
compile


org.apache.spark
spark-sql_2.10
${cdh.spark.version}
compile


org.apache.spark
spark-streaming-kafka_2.10
${cdh.spark.version}
compile


org.apache.kafka
kafka_2.10
0.8.2.1
compile


But trying to execute the application throws errors like below:
Exception in thread "main" java.lang.NoClassDefFoundError:
kafka/cluster/BrokerEndPoint
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
at scala.util.Either$RightProjection.flatMap(Either.scala:523)
at
org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
at
org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
at
org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
at
org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:150)
at
org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215)
at
org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
at scala.util.Either$RightProjection.flatMap(Either.scala:523)
at
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at
com.edgecast.engine.ConcurrentOps$.createDataStreamFromKafka(ConcurrentOps.scala:68)
at
com.edgecast.engine.ConcurrentOps$.startProcessing(ConcurrentOps.scala:32)
at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:33)
at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.ClassNotFoundException: kafka.cluster.BrokerEndPoint
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 38 more
16/06/23 11:09:53 INFO SparkContext: Invoking stop() from shutdown hook


I've tried kafka version 0.8.2.0, 0.8.2.2, 0.9.0.0. With 0.9.0.0 the
processing hangs much sooner.
Can someone help with this error?

regards
Sunita

On Wed, Jun 22, 2016 at 8:20 PM, Sunita Arvind 
wrote:

> Hello Experts,
>
> I am getting this error repeatedly:
>
> 16/06/23 03:06:59 ERROR streaming.StreamingContext: Error 

Re: NullPointerException when starting StreamingContext

2016-06-22 Thread Ted Yu
Which Scala version / Spark release are you using ?

Cheers

On Wed, Jun 22, 2016 at 8:20 PM, Sunita Arvind 
wrote:

> Hello Experts,
>
> I am getting this error repeatedly:
>
> 16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the 
> context, marking it as stopped
> java.lang.NullPointerException
>   at 
> com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
>   at 
> com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
>   at 
> com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>   at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
>   at java.lang.Throwable.writeObject(Throwable.java:985)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>   at 
> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
>   at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141)
>   at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
>   at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251)
>   at 
> org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142)
>   at 
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554)
>   at 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
>   at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
>   at 
> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73)
>   at 
> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67)
>   at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
>
> It seems to be a typical issue. All I am doing here is as below:
>
> Object ProcessingEngine{
>
> def initializeSpark(customer:String):StreamingContext={
>   LogHandler.log.info("InitialeSpark")
>   val custConf = ConfigFactory.load(customer + 
> ".conf").getConfig(customer).withFallback(AppConf)
>   implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer)
>   val ssc: StreamingContext = new StreamingContext(sparkConf, 
> Seconds(custConf.getLong("batchDurSec")))
>   ssc.checkpoint(custConf.getString("checkpointDir"))
>   ssc
> }
>
> def createDataStreamFromKafka(customer:String, ssc: 
> StreamingContext):DStream[Array[Byte]]={
>   val custConf = ConfigFactory.load(customer + 
> ".conf").getConfig(customer).withFallback(ConfigFactory.load())
>   LogHandler.log.info("createDataStreamFromKafka")
>   KafkaUtils.createDirectStream[String,
> Array[Byte],
> StringDecoder,
> DefaultDecoder](
> ssc,
> Map[String, String]("metadata.broker.list" -> 
> 

NullPointerException when starting StreamingContext

2016-06-22 Thread Sunita Arvind
Hello Experts,

I am getting this error repeatedly:

16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the
context, marking it as stopped
java.lang.NullPointerException
at 
com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
at 
com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
at 
com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
at java.lang.Throwable.writeObject(Throwable.java:985)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141)
at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251)
at 
org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142)
at 
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554)
at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at 
com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73)
at 
com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67)
at scala.collection.immutable.List.foreach(List.scala:318)
at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67)
at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)


It seems to be a typical issue. All I am doing here is as below:

Object ProcessingEngine{

def initializeSpark(customer:String):StreamingContext={
  LogHandler.log.info("InitialeSpark")
  val custConf = ConfigFactory.load(customer +
".conf").getConfig(customer).withFallback(AppConf)
  implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer)
  val ssc: StreamingContext = new StreamingContext(sparkConf,
Seconds(custConf.getLong("batchDurSec")))
  ssc.checkpoint(custConf.getString("checkpointDir"))
  ssc
}

def createDataStreamFromKafka(customer:String, ssc:
StreamingContext):DStream[Array[Byte]]={
  val custConf = ConfigFactory.load(customer +
".conf").getConfig(customer).withFallback(ConfigFactory.load())
  LogHandler.log.info("createDataStreamFromKafka")
  KafkaUtils.createDirectStream[String,
Array[Byte],
StringDecoder,
DefaultDecoder](
ssc,
Map[String, String]("metadata.broker.list" ->
custConf.getString("brokers"), "group.id" ->
custConf.getString("groupId")),
Set(custConf.getString("topics")))

}

def main(args: Array[String]): Unit = {
  val AppConf = ConfigFactory.load()
  LogHandler.log.info("Starting the processing Engine")
  getListOfCustomers().foreach{cust