Hi, Spark streaming job from kafka is not picking the messages and is always taking the latest offsets when streaming job is stopped for 2 hours. It is not picking up the offsets that are required to be processed from checkpoint directory. any suggestions on how to process the old messages too when there is shutdown or planned maintenance?
val topics = config.getString(Constants.Properties.KafkaTopics) val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> config.getString(Constants.Properties.KafkaBrokerList)) val sparkSession: SparkSession = runMode match { case "local" => SparkSession.builder.config(sparkConfig).getOrCreate case "yarn" => SparkSession.builder.config(sparkConfig).enableHiveSupport.getOrCreate } val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(config.getInt(Constants.Properties.BatchInterval))) streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir)) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet) val datapointDStream = messages.map(_._2).map(TransformDatapoint.parseDataPointText) lazy val sqlCont = sparkSession.sqlContext hiveDBInstance=config.getString("hiveDBInstance") TransformDatapoint.readDstreamData(sparkSession,sqlCont, datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName, fencedDPTmpTableName, fencedVINDPTmpTableName,hiveDBInstance) //transformDstreamData(sparkSession,datapointDStream,runMode,includeIndex,indexNum,datapointTmpTableName,fencedDPTmpTableName,fencedVINDPTmpTableName); streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir)) streamingContext.start() streamingContext.awaitTermination() streamingContext.stop(stopSparkContext = true, stopGracefully = true) Thanks, Asmath