It could be a missing persist before the checkpoint > On 16. Jan 2018, at 22:04, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> > wrote: > > Hi, > > Spark streaming job from kafka is not picking the messages and is always > taking the latest offsets when streaming job is stopped for 2 hours. It is > not picking up the offsets that are required to be processed from checkpoint > directory. any suggestions on how to process the old messages too when there > is shutdown or planned maintenance? > > val topics = config.getString(Constants.Properties.KafkaTopics) > val topicsSet = topics.split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > config.getString(Constants.Properties.KafkaBrokerList)) > val sparkSession: SparkSession = runMode match { > case "local" => SparkSession.builder.config(sparkConfig).getOrCreate > case "yarn" => > SparkSession.builder.config(sparkConfig).enableHiveSupport.getOrCreate > } > val streamingContext = new StreamingContext(sparkSession.sparkContext, > Seconds(config.getInt(Constants.Properties.BatchInterval))) > > streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir)) > val messages = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet) > val datapointDStream = > messages.map(_._2).map(TransformDatapoint.parseDataPointText) > lazy val sqlCont = sparkSession.sqlContext > > hiveDBInstance=config.getString("hiveDBInstance") > > TransformDatapoint.readDstreamData(sparkSession,sqlCont, > datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName, > fencedDPTmpTableName, fencedVINDPTmpTableName,hiveDBInstance) > > > //transformDstreamData(sparkSession,datapointDStream,runMode,includeIndex,indexNum,datapointTmpTableName,fencedDPTmpTableName,fencedVINDPTmpTableName); > > streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir)) > streamingContext.start() > streamingContext.awaitTermination() > streamingContext.stop(stopSparkContext = true, stopGracefully = true) > > Thanks, > Asmath
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org