It could be a missing persist before the checkpoint

> On 16. Jan 2018, at 22:04, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> 
> wrote:
> 
> Hi,
> 
> Spark streaming job from kafka is not picking the messages and is always 
> taking the latest offsets when streaming job is stopped for 2 hours. It is 
> not picking up the offsets that are required to be processed from checkpoint 
> directory.  any suggestions on how to process the old messages too when there 
> is shutdown or planned maintenance?
> 
>      val topics = config.getString(Constants.Properties.KafkaTopics)
>       val topicsSet = topics.split(",").toSet
>       val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> config.getString(Constants.Properties.KafkaBrokerList))
>       val sparkSession: SparkSession = runMode match {
>         case "local" => SparkSession.builder.config(sparkConfig).getOrCreate
>         case "yarn"  => 
> SparkSession.builder.config(sparkConfig).enableHiveSupport.getOrCreate
>       }
>       val streamingContext = new StreamingContext(sparkSession.sparkContext, 
> Seconds(config.getInt(Constants.Properties.BatchInterval)))
>       
> streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
>       val messages = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet)
>       val datapointDStream = 
> messages.map(_._2).map(TransformDatapoint.parseDataPointText)
>       lazy val sqlCont = sparkSession.sqlContext
>       
>       hiveDBInstance=config.getString("hiveDBInstance")
>       
>       TransformDatapoint.readDstreamData(sparkSession,sqlCont, 
> datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName, 
> fencedDPTmpTableName, fencedVINDPTmpTableName,hiveDBInstance)
>       
>       
> //transformDstreamData(sparkSession,datapointDStream,runMode,includeIndex,indexNum,datapointTmpTableName,fencedDPTmpTableName,fencedVINDPTmpTableName);
>       
> streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
>       streamingContext.start()
>       streamingContext.awaitTermination()
>       streamingContext.stop(stopSparkContext = true, stopGracefully = true)
> 
> Thanks,
> Asmath

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to