Hi Spark Community. I need help with the following issue and I have been researching about it from last 2 weeks and as a last and best resource I want to ask the Spark community.
I am running the following code in Spark* * val sparkConf = new SparkConf()* * .setMaster("local[*]")* * .setAppName("KafkaTest")* * .set("spark.streaming.kafka.maxRatePerPartition","10")* * .set("spark.default.parallelism","10")* * .set("spark.streaming.backpressure.enabled", "true")* * .set("spark.scheduler.mode", "FAIR")* * lazy val sparkContext = new SparkContext(sparkConf)* * val sparkJob = new SparkLocal* * val kafkaParams = Map[String, Object](* * "bootstrap.servers" -> "kafka-270894369.spark.google.com:9092 <http://kafka-270894369.spark.google.com:9092>",* * "key.deserializer" -> classOf[StringDeserializer],* * "value.deserializer" -> classOf[StringDeserializer],* * "group.id <http://group.id>" -> "stream_group1",* * "auto.offset.reset" -> "latest",* * "enable.auto.commit" -> "false",* * "heartbeat.interval.ms <http://heartbeat.interval.ms>" -> "130000", //3000* * "request.timeout.ms <http://request.timeout.ms>" -> "150000", //40000* * "session.timeout.ms <http://session.timeout.ms>" -> "140000", //30000* * "max.poll.interval.ms <http://max.poll.interval.ms>" -> "140000", //isn't a known config* * "max.poll.records" -> "100" //2147483647* * )* * val streamingContext = new StreamingContext(sparkContext, Seconds(120))* * val topics = Array("topicname")* * val kafkaStream = KafkaUtils.createDirectStream[String, String](* * streamingContext,* * PreferConsistent,* * Subscribe[String, String](topics, kafkaParams)* * )* * def messageTuple(tuple: ConsumerRecord[String, String]): (String) = {* * (null) // Removed the code* * }* * var offset : Array[OffsetRange] = null* * kafkaStream.foreachRDD{rdd =>* * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges* * offset = offsetRanges* * rdd.map(row => messageTuple(row))* * .foreachPartition { partition =>* * partition.map(row => null)* * .foreach{ record =>* * print("")* * Thread.sleep(5)* * }* * }* * kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)* * }* * streamingContext.start()* * streamingContext.awaitTerminationOrTimeout(6000000)* * sys.ShutdownHookThread{* * println("Gracefully shutting down App")* * streamingContext.stop(true,true)* * println("Application stopped")* * }* With the above code I am observing multiple commits are sending to Kafka and I am not sure why ? (Got the below info from kafka __consumer_offset topic) * [stream_group1,topicname,59]::OffsetAndMetadata(offset=864006531, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000011, expireTimestamp=Some(1577816400011)) [stream_group1,topicname,59]::OffsetAndMetadata(offset=864006531, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000012, expireTimestamp=Some(1577816400012)) [stream_group1,topicname,59]::OffsetAndMetadata(offset=864005827, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000079, expireTimestamp=Some(1577816400079)) [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120008, expireTimestamp=Some(1577816520008)) [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120010, expireTimestamp=Some(1577816520010)) [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120077, expireTimestamp=Some(1577816520077)) [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240010, expireTimestamp=Some(1577816640010)) [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240015, expireTimestamp=Some(1577816640015)) [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240137, expireTimestamp=Some(1577816640137))* * [stream_group1,topicname,59]::OffsetAndMetadata(offset=864006531, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000012, expireTimestamp=Some(1577816400012))* * [stream_group1,topicname,59]::OffsetAndMetadata(offset=864005827, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000079, expireTimestamp=Some(1577816400079))* * [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120008, expireTimestamp=Some(1577816520008))* * [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120010, expireTimestamp=Some(1577816520010))* * [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120077, expireTimestamp=Some(1577816520077))* * [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240010, expireTimestamp=Some(1577816640010))* * [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240015, expireTimestamp=Some(1577816640015))* * [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240137, expireTimestamp=Some(1577816640137))* Ideally we should see only one commit for every 2mins based on my batch size but in our case we are observing 3 commits. Also during the Application restart we are loosing the data because of above issue (Commit mismatch) Please help me with your inputs ? Thanks in advance, Raghu