Hi,

Spark streaming job from kafka is not picking the messages and is always
taking the latest offsets when streaming job is stopped for 2 hours. It is
not picking up the offsets that are required to be processed from
checkpoint directory.  any suggestions on how to process the old messages
too when there is shutdown or planned maintenance?

     val topics = config.getString(Constants.Properties.KafkaTopics)
      val topicsSet = topics.split(",").toSet
      val kafkaParams = Map[String, String]("metadata.broker.list" ->
config.getString(Constants.Properties.KafkaBrokerList))
      val sparkSession: SparkSession = runMode match {
        case "local" => SparkSession.builder.config(sparkConfig).getOrCreate
        case "yarn"  =>
SparkSession.builder.config(sparkConfig).enableHiveSupport.getOrCreate
      }
      val streamingContext = new
StreamingContext(sparkSession.sparkContext,
Seconds(config.getInt(Constants.Properties.BatchInterval)))

streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
      val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet)
      val datapointDStream =
messages.map(_._2).map(TransformDatapoint.parseDataPointText)
      lazy val sqlCont = sparkSession.sqlContext

      hiveDBInstance=config.getString("hiveDBInstance")

      TransformDatapoint.readDstreamData(sparkSession,sqlCont,
datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName,
fencedDPTmpTableName, fencedVINDPTmpTableName,hiveDBInstance)


//transformDstreamData(sparkSession,datapointDStream,runMode,includeIndex,indexNum,datapointTmpTableName,fencedDPTmpTableName,fencedVINDPTmpTableName);

streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
      streamingContext.start()
      streamingContext.awaitTermination()
      streamingContext.stop(stopSparkContext = true, stopGracefully = true)

Thanks,
Asmath

Reply via email to