I was able to resolve the serialization issue. The root cause was, I was
accessing the config values within foreachRDD{}.
The solution was to extract the values from config outside the foreachRDD
scope and send in values to the loop directly. Probably something obvious
as we cannot have nested distribution data sets. Mentioning it here for
benefit of anyone else stumbling upon the same issue.

regards
Sunita

On Wed, Jun 22, 2016 at 8:20 PM, Sunita Arvind <sunitarv...@gmail.com>
wrote:

> Hello Experts,
>
> I am getting this error repeatedly:
>
> 16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the 
> context, marking it as stopped
> java.lang.NullPointerException
>       at 
> com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
>       at 
> com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
>       at 
> com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
>       at java.lang.Throwable.writeObject(Throwable.java:985)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
>       at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141)
>       at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
>       at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251)
>       at 
> org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142)
>       at 
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554)
>       at 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
>       at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
>       at 
> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73)
>       at 
> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67)
>       at scala.collection.immutable.List.foreach(List.scala:318)
>       at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67)
>       at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
>
> It seems to be a typical issue. All I am doing here is as below:
>
> Object ProcessingEngine{
>
> def initializeSpark(customer:String):StreamingContext={
>   LogHandler.log.info("InitialeSpark")
>   val custConf = ConfigFactory.load(customer + 
> ".conf").getConfig(customer).withFallback(AppConf)
>   implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer)
>   val ssc: StreamingContext = new StreamingContext(sparkConf, 
> Seconds(custConf.getLong("batchDurSec")))
>   ssc.checkpoint(custConf.getString("checkpointDir"))
>   ssc
> }
>
> def createDataStreamFromKafka(customer:String, ssc: 
> StreamingContext):DStream[Array[Byte]]={
>   val custConf = ConfigFactory.load(customer + 
> ".conf").getConfig(customer).withFallback(ConfigFactory.load())
>   LogHandler.log.info("createDataStreamFromKafka")
>   KafkaUtils.createDirectStream[String,
>     Array[Byte],
>     StringDecoder,
>     DefaultDecoder](
>     ssc,
>     Map[String, String]("metadata.broker.list" -> 
> custConf.getString("brokers"), "group.id" -> custConf.getString("groupId")),
>     Set(custConf.getString("topics")))
>
> }
>
> def main(args: Array[String]): Unit = {
>   val AppConf = ConfigFactory.load()
>   LogHandler.log.info("Starting the processing Engine")
>   getListOfCustomers().foreach{cust =>
>     implicit val ssc = initializeSpark(cust)
>     val FullStream = createDataStreamFromKafka(cust,ssc)
>     ConcurrentOps.determineAllSinks(cust, FullStream)
>     FullStream.checkpoint(Duration(AppConf.getLong("batchDurSec") * 2000))
>     ssc.start()
>     ssc.awaitTermination()
>   }
>
> }
> }
>
> I also tried putting all the initialization directly in main (not using
> method calls for initializeSpark and createDataStreamFromKafka) and also
> not putting in foreach and creating a single spark and streaming context.
> However, the error persists.
>
> Appreciate any help.
>
> regards
> Sunita
>

Reply via email to