Hello Experts, I am getting this error repeatedly:
16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped java.lang.NullPointerException at com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202) at com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228) at com.typesafe.config.ConfigException.writeObject(ConfigException.java:58) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) at java.lang.Throwable.writeObject(Throwable.java:985) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141) at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141) at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251) at org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142) at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) at com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73) at com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67) at scala.collection.immutable.List.foreach(List.scala:318) at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67) at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) It seems to be a typical issue. All I am doing here is as below: Object ProcessingEngine{ def initializeSpark(customer:String):StreamingContext={ LogHandler.log.info("InitialeSpark") val custConf = ConfigFactory.load(customer + ".conf").getConfig(customer).withFallback(AppConf) implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer) val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(custConf.getLong("batchDurSec"))) ssc.checkpoint(custConf.getString("checkpointDir")) ssc } def createDataStreamFromKafka(customer:String, ssc: StreamingContext):DStream[Array[Byte]]={ val custConf = ConfigFactory.load(customer + ".conf").getConfig(customer).withFallback(ConfigFactory.load()) LogHandler.log.info("createDataStreamFromKafka") KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder]( ssc, Map[String, String]("metadata.broker.list" -> custConf.getString("brokers"), "group.id" -> custConf.getString("groupId")), Set(custConf.getString("topics"))) } def main(args: Array[String]): Unit = { val AppConf = ConfigFactory.load() LogHandler.log.info("Starting the processing Engine") getListOfCustomers().foreach{cust => implicit val ssc = initializeSpark(cust) val FullStream = createDataStreamFromKafka(cust,ssc) ConcurrentOps.determineAllSinks(cust, FullStream) FullStream.checkpoint(Duration(AppConf.getLong("batchDurSec") * 2000)) ssc.start() ssc.awaitTermination() } } } I also tried putting all the initialization directly in main (not using method calls for initializeSpark and createDataStreamFromKafka) and also not putting in foreach and creating a single spark and streaming context. However, the error persists. Appreciate any help. regards Sunita