Hi Spark Community.

I need help with the following issue and I have been researching about it
from last 2 weeks and as a last and best resource I want to ask the Spark
community.

I am running the following code in Spark*

    *  val sparkConf = new SparkConf()*

*        .setMaster("local[*]")*

*        .setAppName("KafkaTest")*

*        .set("spark.streaming.kafka.maxRatePerPartition","10")*

*        .set("spark.default.parallelism","10")*

*        .set("spark.streaming.backpressure.enabled", "true")*

*        .set("spark.scheduler.mode", "FAIR")*



*      lazy val sparkContext = new SparkContext(sparkConf)*

*      val sparkJob = new SparkLocal*



*      val kafkaParams = Map[String, Object](*

*          "bootstrap.servers" -> "kafka-270894369.spark.google.com:9092
<http://kafka-270894369.spark.google.com:9092>",*

*          "key.deserializer" -> classOf[StringDeserializer],*

*          "value.deserializer" -> classOf[StringDeserializer],*

*          "group.id <http://group.id>" -> "stream_group1",*

*          "auto.offset.reset" -> "latest",*

*          "enable.auto.commit" -> "false",*

*          "heartbeat.interval.ms <http://heartbeat.interval.ms>" ->
"130000", //3000*

*          "request.timeout.ms <http://request.timeout.ms>" -> "150000",
//40000*

*          "session.timeout.ms <http://session.timeout.ms>" -> "140000",
//30000*

*          "max.poll.interval.ms <http://max.poll.interval.ms>" ->
"140000", //isn't a known config*

*          "max.poll.records" -> "100" //2147483647*

*        )*



*        val streamingContext = new StreamingContext(sparkContext,
Seconds(120))*



*        val topics = Array("topicname")*



*        val kafkaStream = KafkaUtils.createDirectStream[String, String](*

*          streamingContext,*

*          PreferConsistent,*

*          Subscribe[String, String](topics, kafkaParams)*

*        )*



*        def messageTuple(tuple: ConsumerRecord[String, String]): (String)
= {*

*          (null) // Removed the code*

*        }*



*        var offset : Array[OffsetRange] = null*



*        kafkaStream.foreachRDD{rdd =>*

*          val offsetRanges =
rdd.asInstanceOf[HasOffsetRanges].offsetRanges*

*          offset = offsetRanges*



*          rdd.map(row => messageTuple(row))*

*            .foreachPartition { partition =>*

*              partition.map(row => null)*

*                .foreach{ record =>*

*                  print("")*

*                  Thread.sleep(5)*

*                }*

*              }*

*
kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)*

*          }*



*        streamingContext.start()*

*        streamingContext.awaitTerminationOrTimeout(6000000)*



*        sys.ShutdownHookThread{*

*          println("Gracefully shutting down App")*

*          streamingContext.stop(true,true)*

*          println("Application stopped")*

*        }*




With the above code I am observing multiple commits are sending to Kafka
and I am not sure why ?

(Got the below info from kafka __consumer_offset topic)













*  [stream_group1,topicname,59]::OffsetAndMetadata(offset=864006531,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000011,
expireTimestamp=Some(1577816400011))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864006531,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000012,
expireTimestamp=Some(1577816400012))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864005827,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000079,
expireTimestamp=Some(1577816400079))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120008,
expireTimestamp=Some(1577816520008))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120010,
expireTimestamp=Some(1577816520010))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120077,
expireTimestamp=Some(1577816520077))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240010,
expireTimestamp=Some(1577816640010))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240015,
expireTimestamp=Some(1577816640015))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240137,
expireTimestamp=Some(1577816640137))*

*    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864006531,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000012,
expireTimestamp=Some(1577816400012))*
*    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864005827,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000079,
expireTimestamp=Some(1577816400079))*

*    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120008,
expireTimestamp=Some(1577816520008))*
*    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120010,
expireTimestamp=Some(1577816520010))*
*    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120077,
expireTimestamp=Some(1577816520077))*

*    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240010,
expireTimestamp=Some(1577816640010))*
*    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240015,
expireTimestamp=Some(1577816640015))*
*    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240137,
expireTimestamp=Some(1577816640137))*



Ideally we should see only one commit for every 2mins based on my batch
size but in our case we are observing 3 commits.

Also during the Application restart we are loosing the data because of
above issue (Commit mismatch)

Please help me with your inputs ?

Thanks in advance,
Raghu

Reply via email to