Hello Experts,

I am getting this error repeatedly:

16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the
context, marking it as stopped
java.lang.NullPointerException
        at 
com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
        at 
com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
        at 
com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
        at java.lang.Throwable.writeObject(Throwable.java:985)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
        at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141)
        at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
        at 
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251)
        at 
org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142)
        at 
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554)
        at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
        at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
        at 
com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73)
        at 
com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67)
        at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)


It seems to be a typical issue. All I am doing here is as below:

Object ProcessingEngine{

def initializeSpark(customer:String):StreamingContext={
  LogHandler.log.info("InitialeSpark")
  val custConf = ConfigFactory.load(customer +
".conf").getConfig(customer).withFallback(AppConf)
  implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer)
  val ssc: StreamingContext = new StreamingContext(sparkConf,
Seconds(custConf.getLong("batchDurSec")))
  ssc.checkpoint(custConf.getString("checkpointDir"))
  ssc
}

def createDataStreamFromKafka(customer:String, ssc:
StreamingContext):DStream[Array[Byte]]={
  val custConf = ConfigFactory.load(customer +
".conf").getConfig(customer).withFallback(ConfigFactory.load())
  LogHandler.log.info("createDataStreamFromKafka")
  KafkaUtils.createDirectStream[String,
    Array[Byte],
    StringDecoder,
    DefaultDecoder](
    ssc,
    Map[String, String]("metadata.broker.list" ->
custConf.getString("brokers"), "group.id" ->
custConf.getString("groupId")),
    Set(custConf.getString("topics")))

}

def main(args: Array[String]): Unit = {
  val AppConf = ConfigFactory.load()
  LogHandler.log.info("Starting the processing Engine")
  getListOfCustomers().foreach{cust =>
    implicit val ssc = initializeSpark(cust)
    val FullStream = createDataStreamFromKafka(cust,ssc)
    ConcurrentOps.determineAllSinks(cust, FullStream)
    FullStream.checkpoint(Duration(AppConf.getLong("batchDurSec") * 2000))
    ssc.start()
    ssc.awaitTermination()
  }

}
}

I also tried putting all the initialization directly in main (not using
method calls for initializeSpark and createDataStreamFromKafka) and also
not putting in foreach and creating a single spark and streaming context.
However, the error persists.

Appreciate any help.

regards
Sunita

Reply via email to