Thank you Mich for your reply. Actually, I tried to do most of your advice.
When spark.streaming.kafka.allowNonConsecutiveOffsets=false, I got the following error. Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 3) (chango-private-1.chango.private executor driver): java.lang.IllegalArgumentException: requirement failed: Got wrong record for spark-executor-school-student-group school-student-7 even after seeking to offset 11206961 got offset 11206962 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets at scala.Predef$.require(Predef.scala:281) at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:155) at org.apache.spark.streaming.kafka010.KafkaDataConsumer.get(KafkaDataConsumer.scala:40) at org.apache.spark.streaming.kafka010.KafkaDataConsumer.get$(KafkaDataConsumer.scala:39) at org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:219) And I tried to increase spark.streaming.kafka.consumer.poll.ms to avoid the exceptions, but it did not help. - Kidong. 2024년 4월 14일 (일) 오전 4:25, Mich Talebzadeh <mich.talebza...@gmail.com>님이 작성: > Hi Kidong, > > There may be few potential reasons why the message counts from your Kafka > producer and Spark Streaming consumer might not match, especially with > transactional messages and read_committed isolation level. > > 1) Just ensure that both your Spark Streaming job and the Kafka consumer > written with raw kafka-clients use the same consumer group. Messages are > delivered to specific consumer groups, and if they differ, Spark Streaming > might miss messages consumed by the raw consumer. > 2) Your Spark Streaming configuration sets *enable.auto.commit=false* and > uses *commitAsync manually*. However, I noted > *spark.streaming.kafka.allowNonConsecutiveOffsets=true* which may be > causing the problem. This setting allows Spark Streaming to read offsets > that are not strictly increasing, which can happen with transactional > reads. Generally recommended to set this to* false *for transactional > reads to ensure Spark Streaming only reads committed messages. > 3) Missed messages, in transactional messages, Kafka guarantees *delivery > only after the transaction commits successfully. *There could be a slight > delay between the producer sending the message and it becoming visible to > consumers under read_committed isolation level. Spark Streaming could > potentially miss messages during this window. > 4) The exception Lost task 0.0 in stage 324.0, suggests a problem fetching > records for a specific topic partition. Review your code handling of > potential exceptions during rdd.foreachRDD processing. Ensure retries or > appropriate error handling if encountering issues with specific partitions. > 5) Try different configurations for *spark.streaming.kafka.consumer.poll.ms > <http://spark.streaming.kafka.consumer.poll.ms>* to adjust polling > frequency and potentially improve visibility into committed messages. > > HTH > > Mich Talebzadeh, > Technologist | Solutions Architect | Data Engineer | Generative AI > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Fri, 12 Apr 2024 at 21:38, Kidong Lee <mykid...@gmail.com> wrote: > >> Hi, >> >> I have a kafka producer which sends messages transactionally to kafka and >> spark streaming job which should consume read_committed messages from kafka. >> But there is a problem for spark streaming to consume read_committed >> messages. >> The count of messages sent by kafka producer transactionally is not the >> same to the count of the read_committed messages consumed by spark >> streaming. >> >> Some consumer properties of my spark streaming job are as follows. >> >> auto.offset.reset=earliest >> enable.auto.commit=false >> isolation.level=read_committed >> >> >> I also added the following spark streaming configuration. >> >> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true"); >> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * >> 60 * 1000)); >> >> >> My spark streaming is using DirectStream like this. >> >> JavaInputDStream<ConsumerRecord<Integer, GenericRecord>> stream = >> KafkaUtils.createDirectStream( >> ssc, >> LocationStrategies.PreferConsistent(), >> ConsumerStrategies.<Integer, GenericRecord>Subscribe(topics, >> kafkaParams) >> ); >> >> >> stream.foreachRDD(rdd -> O >> >> // get offset ranges. >> >> OffsetRange[] offsetRanges = ((HasOffsetRanges) >> rdd.rdd()).offsetRanges(); >> >> // process something. >> >> >> // commit offset. >> ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); >> >> } >> ); >> >> >> >> I have tested with a kafka consumer written with raw kafka-clients jar >> library without problem that it consumes read_committed messages correctly, >> and the count of consumed read_committed messages is equal to the count of >> messages sent by kafka producer. >> >> >> And sometimes, I got the following exception. >> >> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times, >> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674) >> (chango-private-1.chango.private executor driver): >> java.lang.IllegalArgumentException: requirement failed: Failed to get >> records for compacted spark-executor-school-student-group school-student-7 >> after polling for 120000 >> >> at scala.Predef$.require(Predef.scala:281) >> >> at >> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186) >> >> at >> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60) >> >> at >> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59) >> >> at >> org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219) >> >> >> >> I have experienced spark streaming job which works fine with kafka >> messages which are non-transactional, and I never encountered the >> exceptions like above. >> It seems that spark streaming for kafka transaction does not handle such >> as kafka consumer properties like isolation.level=read_committed and >> enable.auto.commit=false correctly. >> >> Any help appreciated. >> >> - Kidong. >> >> >> -- >> *이기동 * >> *Kidong Lee* >> >> Email: mykid...@gmail.com >> Chango: https://cloudcheflabs.github.io/chango-private-docs >> Web Site: http://www.cloudchef-labs.com/ >> Mobile: +82 10 4981 7297 >> <http://www.cloudchef-labs.com/> >> > -- *이기동 * *Kidong Lee* Email: mykid...@gmail.com Chango: https://cloudcheflabs.github.io/chango-private-docs Web Site: http://www.cloudchef-labs.com/ Mobile: +82 10 4981 7297 <http://www.cloudchef-labs.com/>