sometimes I get this messages in logs but the job still runs. do you have solution on how to fix this? I have added the code in my earlier email.
Exception in thread "pool-22-thread-9" java.lang.NullPointerException at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run( Checkpoint.scala:233) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) On Tue, Jan 16, 2018 at 3:16 PM, Jörn Franke <jornfra...@gmail.com> wrote: > It could be a missing persist before the checkpoint > > > On 16. Jan 2018, at 22:04, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> > wrote: > > > > Hi, > > > > Spark streaming job from kafka is not picking the messages and is always > taking the latest offsets when streaming job is stopped for 2 hours. It is > not picking up the offsets that are required to be processed from > checkpoint directory. any suggestions on how to process the old messages > too when there is shutdown or planned maintenance? > > > > val topics = config.getString(Constants.Properties.KafkaTopics) > > val topicsSet = topics.split(",").toSet > > val kafkaParams = Map[String, String]("metadata.broker.list" -> > config.getString(Constants.Properties.KafkaBrokerList)) > > val sparkSession: SparkSession = runMode match { > > case "local" => SparkSession.builder.config( > sparkConfig).getOrCreate > > case "yarn" => SparkSession.builder.config(sparkConfig). > enableHiveSupport.getOrCreate > > } > > val streamingContext = new StreamingContext(sparkSession.sparkContext, > Seconds(config.getInt(Constants.Properties.BatchInterval))) > > streamingContext.checkpoint(config.getString(Constants. > Properties.CheckPointDir)) > > val messages = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet) > > val datapointDStream = messages.map(_._2).map(TransformDatapoint. > parseDataPointText) > > lazy val sqlCont = sparkSession.sqlContext > > > > hiveDBInstance=config.getString("hiveDBInstance") > > > > TransformDatapoint.readDstreamData(sparkSession,sqlCont, > datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName, > fencedDPTmpTableName, fencedVINDPTmpTableName,hiveDBInstance) > > > > //transformDstreamData(sparkSession,datapointDStream, > runMode,includeIndex,indexNum,datapointTmpTableName,fencedDPTmpTableName, > fencedVINDPTmpTableName); > > streamingContext.checkpoint(config.getString(Constants. > Properties.CheckPointDir)) > > streamingContext.start() > > streamingContext.awaitTermination() > > streamingContext.stop(stopSparkContext = true, stopGracefully = > true) > > > > Thanks, > > Asmath >