Which Scala version / Spark release are you using ? Cheers
On Wed, Jun 22, 2016 at 8:20 PM, Sunita Arvind <sunitarv...@gmail.com> wrote: > Hello Experts, > > I am getting this error repeatedly: > > 16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the > context, marking it as stopped > java.lang.NullPointerException > at > com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202) > at > com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228) > at > com.typesafe.config.ConfigException.writeObject(ConfigException.java:58) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) > at java.lang.Throwable.writeObject(Throwable.java:985) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251) > at > org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142) > at > org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554) > at > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) > at > com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73) > at > com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67) > at scala.collection.immutable.List.foreach(List.scala:318) > at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67) > at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) > > > It seems to be a typical issue. All I am doing here is as below: > > Object ProcessingEngine{ > > def initializeSpark(customer:String):StreamingContext={ > LogHandler.log.info("InitialeSpark") > val custConf = ConfigFactory.load(customer + > ".conf").getConfig(customer).withFallback(AppConf) > implicit val sparkConf: SparkConf = new SparkConf().setAppName(customer) > val ssc: StreamingContext = new StreamingContext(sparkConf, > Seconds(custConf.getLong("batchDurSec"))) > ssc.checkpoint(custConf.getString("checkpointDir")) > ssc > } > > def createDataStreamFromKafka(customer:String, ssc: > StreamingContext):DStream[Array[Byte]]={ > val custConf = ConfigFactory.load(customer + > ".conf").getConfig(customer).withFallback(ConfigFactory.load()) > LogHandler.log.info("createDataStreamFromKafka") > KafkaUtils.createDirectStream[String, > Array[Byte], > StringDecoder, > DefaultDecoder]( > ssc, > Map[String, String]("metadata.broker.list" -> > custConf.getString("brokers"), "group.id" -> custConf.getString("groupId")), > Set(custConf.getString("topics"))) > > } > > def main(args: Array[String]): Unit = { > val AppConf = ConfigFactory.load() > LogHandler.log.info("Starting the processing Engine") > getListOfCustomers().foreach{cust => > implicit val ssc = initializeSpark(cust) > val FullStream = createDataStreamFromKafka(cust,ssc) > ConcurrentOps.determineAllSinks(cust, FullStream) > FullStream.checkpoint(Duration(AppConf.getLong("batchDurSec") * 2000)) > ssc.start() > ssc.awaitTermination() > } > > } > } > > I also tried putting all the initialization directly in main (not using > method calls for initializeSpark and createDataStreamFromKafka) and also > not putting in foreach and creating a single spark and streaming context. > However, the error persists. > > Appreciate any help. > > regards > Sunita >