Hello Experts,
I am getting this error repeatedly:
16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the
context, marking it as stopped
java.lang.NullPointerException
at
com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
at
com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
at
com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
at java.lang.Throwable.writeObject(Throwable.java:985)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
at
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141)
at
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
at
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251)
at
org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142)
at
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554)
at
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at
com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73)
at
com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67)
at scala.collection.immutable.List.foreach(List.scala:318)
at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67)
at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
It seems to be a typical issue. All I am doing here is as below:
Object ProcessingEngine{
def initializeSpark(customer:String):StreamingContext={
LogHandler.log.info("InitialeSpark")
val custConf = ConfigFactory.load(customer +
".conf").getConfig(customer).withFallback(AppConf)
implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer)
val ssc: StreamingContext = new StreamingContext(sparkConf,
Seconds(custConf.getLong("batchDurSec")))
ssc.checkpoint(custConf.getString("checkpointDir"))
ssc
}
def createDataStreamFromKafka(customer:String, ssc:
StreamingContext):DStream[Array[Byte]]={
val custConf = ConfigFactory.load(customer +
".conf").getConfig(customer).withFallback(ConfigFactory.load())
LogHandler.log.info("createDataStreamFromKafka")
KafkaUtils.createDirectStream[String,
Array[Byte],
StringDecoder,
DefaultDecoder](
ssc,
Map[String, String]("metadata.broker.list" ->
custConf.getString("brokers"), "group.id" ->
custConf.getString("groupId")),
Set(custConf.getString("topics")))
}
def main(args: Array[String]): Unit = {
val AppConf = ConfigFactory.load()
LogHandler.log.info("Starting the processing Engine")
getListOfCustomers().foreach{cust =>
implicit val ssc = initializeSpark(cust)
val FullStream = createDataStreamFromKafka(cust,ssc)
ConcurrentOps.determineAllSinks(cust, FullStream)
FullStream.checkpoint(Duration(AppConf.getLong("batchDurSec") * 2000))
ssc.start()
ssc.awaitTermination()
}
}
}
I also tried putting all the initialization directly in main (not using
method calls for initializeSpark and createDataStreamFromKafka) and also
not putting in foreach and creating a single spark and streaming context.
However, the error persists.
Appreciate any help.
regards
Sunita