sometimes I get this messages in logs but the job still runs. do you have
solution on how to fix this? I have added the code in my earlier email.

Exception in thread "pool-22-thread-9" java.lang.NullPointerException

at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(
Checkpoint.scala:233)

at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

On Tue, Jan 16, 2018 at 3:16 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> It could be a missing persist before the checkpoint
>
> > On 16. Jan 2018, at 22:04, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com>
> wrote:
> >
> > Hi,
> >
> > Spark streaming job from kafka is not picking the messages and is always
> taking the latest offsets when streaming job is stopped for 2 hours. It is
> not picking up the offsets that are required to be processed from
> checkpoint directory.  any suggestions on how to process the old messages
> too when there is shutdown or planned maintenance?
> >
> >      val topics = config.getString(Constants.Properties.KafkaTopics)
> >       val topicsSet = topics.split(",").toSet
> >       val kafkaParams = Map[String, String]("metadata.broker.list" ->
> config.getString(Constants.Properties.KafkaBrokerList))
> >       val sparkSession: SparkSession = runMode match {
> >         case "local" => SparkSession.builder.config(
> sparkConfig).getOrCreate
> >         case "yarn"  => SparkSession.builder.config(sparkConfig).
> enableHiveSupport.getOrCreate
> >       }
> >       val streamingContext = new StreamingContext(sparkSession.sparkContext,
> Seconds(config.getInt(Constants.Properties.BatchInterval)))
> >       streamingContext.checkpoint(config.getString(Constants.
> Properties.CheckPointDir))
> >       val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet)
> >       val datapointDStream = messages.map(_._2).map(TransformDatapoint.
> parseDataPointText)
> >       lazy val sqlCont = sparkSession.sqlContext
> >
> >       hiveDBInstance=config.getString("hiveDBInstance")
> >
> >       TransformDatapoint.readDstreamData(sparkSession,sqlCont,
> datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName,
> fencedDPTmpTableName, fencedVINDPTmpTableName,hiveDBInstance)
> >
> >       //transformDstreamData(sparkSession,datapointDStream,
> runMode,includeIndex,indexNum,datapointTmpTableName,fencedDPTmpTableName,
> fencedVINDPTmpTableName);
> >       streamingContext.checkpoint(config.getString(Constants.
> Properties.CheckPointDir))
> >       streamingContext.start()
> >       streamingContext.awaitTermination()
> >       streamingContext.stop(stopSparkContext = true, stopGracefully =
> true)
> >
> > Thanks,
> > Asmath
>

Reply via email to