Re: NullPointerException when starting StreamingContext
I was able to resolve the serialization issue. The root cause was, I was accessing the config values within foreachRDD{}. The solution was to extract the values from config outside the foreachRDD scope and send in values to the loop directly. Probably something obvious as we cannot have nested distribution data sets. Mentioning it here for benefit of anyone else stumbling upon the same issue. regards Sunita On Wed, Jun 22, 2016 at 8:20 PM, Sunita Arvindwrote: > Hello Experts, > > I am getting this error repeatedly: > > 16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the > context, marking it as stopped > java.lang.NullPointerException > at > com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202) > at > com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228) > at > com.typesafe.config.ConfigException.writeObject(ConfigException.java:58) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) > at java.lang.Throwable.writeObject(Throwable.java:985) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251) > at > org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142) > at > org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554) > at > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) > at > com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73) > at > com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67) > at scala.collection.immutable.List.foreach(List.scala:318) > at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67) > at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) > > > It seems to be a typical issue. All I am doing here is as below: > > Object ProcessingEngine{ > > def initializeSpark(customer:String):StreamingContext={ > LogHandler.log.info("InitialeSpark") > val custConf = ConfigFactory.load(customer + > ".conf").getConfig(customer).withFallback(AppConf) > implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer) > val ssc: StreamingContext = new StreamingContext(sparkConf, > Seconds(custConf.getLong("batchDurSec"))) > ssc.checkpoint(custConf.getString("checkpointDir")) > ssc > } > > def createDataStreamFromKafka(customer:String, ssc: > StreamingContext):DStream[Array[Byte]]={ > val
Re: NullPointerException when starting StreamingContext
That looks like a classpath problem. You should not have to include the kafka_2.10 artifact in your pom, spark-streaming-kafka_2.10 already has a transitive dependency on it. That being said, 0.8.2.1 is the correct version, so that's a little strange. How are you building and submitting your application? Finally, if this ends up being a CDH related issue, you may have better luck on their forum. On Thu, Jun 23, 2016 at 1:16 PM, Sunita Arvindwrote: > Also, just to keep it simple, I am trying to use 1.6.0CDH5.7.0 in the > pom.xml as the cluster I am trying to run on is CDH5.7.0 with spark 1.6.0. > > Here is my pom setting: > > > 1.6.0-cdh5.7.0 > > org.apache.spark > spark-core_2.10 > ${cdh.spark.version} > compile > > > org.apache.spark > spark-streaming_2.10 > ${cdh.spark.version} > compile > > > org.apache.spark > spark-sql_2.10 > ${cdh.spark.version} > compile > > > org.apache.spark > spark-streaming-kafka_2.10 > ${cdh.spark.version} > compile > > > org.apache.kafka > kafka_2.10 > 0.8.2.1 > compile > > > But trying to execute the application throws errors like below: > Exception in thread "main" java.lang.NoClassDefFoundError: > kafka/cluster/BrokerEndPoint > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90) > at scala.Option.map(Option.scala:145) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85) > at scala.util.Either$RightProjection.flatMap(Either.scala:523) > at > org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85) > at > org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179) > at > org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161) > at > org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:150) > at > org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215) > at > org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211) > at scala.util.Either$RightProjection.flatMap(Either.scala:523) > at > org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211) > at > org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) > at > com.edgecast.engine.ConcurrentOps$.createDataStreamFromKafka(ConcurrentOps.scala:68) > at > com.edgecast.engine.ConcurrentOps$.startProcessing(ConcurrentOps.scala:32) > at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:33) > at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > Caused by: java.lang.ClassNotFoundException: kafka.cluster.BrokerEndPoint > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) >
Re: NullPointerException when starting StreamingContext
Also, just to keep it simple, I am trying to use 1.6.0CDH5.7.0 in the pom.xml as the cluster I am trying to run on is CDH5.7.0 with spark 1.6.0. Here is my pom setting: 1.6.0-cdh5.7.0 org.apache.spark spark-core_2.10 ${cdh.spark.version} compile org.apache.spark spark-streaming_2.10 ${cdh.spark.version} compile org.apache.spark spark-sql_2.10 ${cdh.spark.version} compile org.apache.spark spark-streaming-kafka_2.10 ${cdh.spark.version} compile org.apache.kafka kafka_2.10 0.8.2.1 compile But trying to execute the application throws errors like below: Exception in thread "main" java.lang.NoClassDefFoundError: kafka/cluster/BrokerEndPoint at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85) at scala.util.Either$RightProjection.flatMap(Either.scala:523) at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85) at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179) at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161) at org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:150) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211) at scala.util.Either$RightProjection.flatMap(Either.scala:523) at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at com.edgecast.engine.ConcurrentOps$.createDataStreamFromKafka(ConcurrentOps.scala:68) at com.edgecast.engine.ConcurrentOps$.startProcessing(ConcurrentOps.scala:32) at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:33) at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: java.lang.ClassNotFoundException: kafka.cluster.BrokerEndPoint at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 38 more 16/06/23 11:09:53 INFO SparkContext: Invoking stop() from shutdown hook I've tried kafka version 0.8.2.0, 0.8.2.2, 0.9.0.0. With 0.9.0.0 the processing hangs much sooner. Can someone help with this error? regards Sunita On Wed, Jun 22, 2016 at 8:20 PM, Sunita Arvindwrote: > Hello Experts, > > I am getting this error repeatedly: > > 16/06/23 03:06:59 ERROR streaming.StreamingContext: Error
Re: NullPointerException when starting StreamingContext
Which Scala version / Spark release are you using ? Cheers On Wed, Jun 22, 2016 at 8:20 PM, Sunita Arvindwrote: > Hello Experts, > > I am getting this error repeatedly: > > 16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the > context, marking it as stopped > java.lang.NullPointerException > at > com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202) > at > com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228) > at > com.typesafe.config.ConfigException.writeObject(ConfigException.java:58) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) > at java.lang.Throwable.writeObject(Throwable.java:985) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251) > at > org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142) > at > org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554) > at > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) > at > com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73) > at > com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67) > at scala.collection.immutable.List.foreach(List.scala:318) > at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67) > at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) > > > It seems to be a typical issue. All I am doing here is as below: > > Object ProcessingEngine{ > > def initializeSpark(customer:String):StreamingContext={ > LogHandler.log.info("InitialeSpark") > val custConf = ConfigFactory.load(customer + > ".conf").getConfig(customer).withFallback(AppConf) > implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer) > val ssc: StreamingContext = new StreamingContext(sparkConf, > Seconds(custConf.getLong("batchDurSec"))) > ssc.checkpoint(custConf.getString("checkpointDir")) > ssc > } > > def createDataStreamFromKafka(customer:String, ssc: > StreamingContext):DStream[Array[Byte]]={ > val custConf = ConfigFactory.load(customer + > ".conf").getConfig(customer).withFallback(ConfigFactory.load()) > LogHandler.log.info("createDataStreamFromKafka") > KafkaUtils.createDirectStream[String, > Array[Byte], > StringDecoder, > DefaultDecoder]( > ssc, > Map[String, String]("metadata.broker.list" -> >
NullPointerException when starting StreamingContext
Hello Experts, I am getting this error repeatedly: 16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped java.lang.NullPointerException at com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202) at com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228) at com.typesafe.config.ConfigException.writeObject(ConfigException.java:58) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) at java.lang.Throwable.writeObject(Throwable.java:985) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141) at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141) at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251) at org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142) at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) at com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73) at com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67) at scala.collection.immutable.List.foreach(List.scala:318) at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67) at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) It seems to be a typical issue. All I am doing here is as below: Object ProcessingEngine{ def initializeSpark(customer:String):StreamingContext={ LogHandler.log.info("InitialeSpark") val custConf = ConfigFactory.load(customer + ".conf").getConfig(customer).withFallback(AppConf) implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer) val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(custConf.getLong("batchDurSec"))) ssc.checkpoint(custConf.getString("checkpointDir")) ssc } def createDataStreamFromKafka(customer:String, ssc: StreamingContext):DStream[Array[Byte]]={ val custConf = ConfigFactory.load(customer + ".conf").getConfig(customer).withFallback(ConfigFactory.load()) LogHandler.log.info("createDataStreamFromKafka") KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder]( ssc, Map[String, String]("metadata.broker.list" -> custConf.getString("brokers"), "group.id" -> custConf.getString("groupId")), Set(custConf.getString("topics"))) } def main(args: Array[String]): Unit = { val AppConf = ConfigFactory.load() LogHandler.log.info("Starting the processing Engine") getListOfCustomers().foreach{cust