Which Scala version / Spark release are you using ?

Cheers

On Wed, Jun 22, 2016 at 8:20 PM, Sunita Arvind <sunitarv...@gmail.com>
wrote:

> Hello Experts,
>
> I am getting this error repeatedly:
>
> 16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the 
> context, marking it as stopped
> java.lang.NullPointerException
>       at 
> com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
>       at 
> com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
>       at 
> com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>       at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
>       at java.lang.Throwable.writeObject(Throwable.java:985)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>       at 
> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
>       at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141)
>       at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
>       at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251)
>       at 
> org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142)
>       at 
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554)
>       at 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
>       at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
>       at 
> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73)
>       at 
> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67)
>       at scala.collection.immutable.List.foreach(List.scala:318)
>       at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67)
>       at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
>
> It seems to be a typical issue. All I am doing here is as below:
>
> Object ProcessingEngine{
>
> def initializeSpark(customer:String):StreamingContext={
>   LogHandler.log.info("InitialeSpark")
>   val custConf = ConfigFactory.load(customer + 
> ".conf").getConfig(customer).withFallback(AppConf)
>   implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer)
>   val ssc: StreamingContext = new StreamingContext(sparkConf, 
> Seconds(custConf.getLong("batchDurSec")))
>   ssc.checkpoint(custConf.getString("checkpointDir"))
>   ssc
> }
>
> def createDataStreamFromKafka(customer:String, ssc: 
> StreamingContext):DStream[Array[Byte]]={
>   val custConf = ConfigFactory.load(customer + 
> ".conf").getConfig(customer).withFallback(ConfigFactory.load())
>   LogHandler.log.info("createDataStreamFromKafka")
>   KafkaUtils.createDirectStream[String,
>     Array[Byte],
>     StringDecoder,
>     DefaultDecoder](
>     ssc,
>     Map[String, String]("metadata.broker.list" -> 
> custConf.getString("brokers"), "group.id" -> custConf.getString("groupId")),
>     Set(custConf.getString("topics")))
>
> }
>
> def main(args: Array[String]): Unit = {
>   val AppConf = ConfigFactory.load()
>   LogHandler.log.info("Starting the processing Engine")
>   getListOfCustomers().foreach{cust =>
>     implicit val ssc = initializeSpark(cust)
>     val FullStream = createDataStreamFromKafka(cust,ssc)
>     ConcurrentOps.determineAllSinks(cust, FullStream)
>     FullStream.checkpoint(Duration(AppConf.getLong("batchDurSec") * 2000))
>     ssc.start()
>     ssc.awaitTermination()
>   }
>
> }
> }
>
> I also tried putting all the initialization directly in main (not using
> method calls for initializeSpark and createDataStreamFromKafka) and also
> not putting in foreach and creating a single spark and streaming context.
> However, the error persists.
>
> Appreciate any help.
>
> regards
> Sunita
>

Reply via email to