Hi Kidong, There may be few potential reasons why the message counts from your Kafka producer and Spark Streaming consumer might not match, especially with transactional messages and read_committed isolation level.
1) Just ensure that both your Spark Streaming job and the Kafka consumer written with raw kafka-clients use the same consumer group. Messages are delivered to specific consumer groups, and if they differ, Spark Streaming might miss messages consumed by the raw consumer. 2) Your Spark Streaming configuration sets *enable.auto.commit=false* and uses *commitAsync manually*. However, I noted *spark.streaming.kafka.allowNonConsecutiveOffsets=true* which may be causing the problem. This setting allows Spark Streaming to read offsets that are not strictly increasing, which can happen with transactional reads. Generally recommended to set this to* false *for transactional reads to ensure Spark Streaming only reads committed messages. 3) Missed messages, in transactional messages, Kafka guarantees *delivery only after the transaction commits successfully. *There could be a slight delay between the producer sending the message and it becoming visible to consumers under read_committed isolation level. Spark Streaming could potentially miss messages during this window. 4) The exception Lost task 0.0 in stage 324.0, suggests a problem fetching records for a specific topic partition. Review your code handling of potential exceptions during rdd.foreachRDD processing. Ensure retries or appropriate error handling if encountering issues with specific partitions. 5) Try different configurations for *spark.streaming.kafka.consumer.poll.ms <http://spark.streaming.kafka.consumer.poll.ms>* to adjust polling frequency and potentially improve visibility into committed messages. HTH Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Fri, 12 Apr 2024 at 21:38, Kidong Lee <mykid...@gmail.com> wrote: > Hi, > > I have a kafka producer which sends messages transactionally to kafka and > spark streaming job which should consume read_committed messages from kafka. > But there is a problem for spark streaming to consume read_committed > messages. > The count of messages sent by kafka producer transactionally is not the > same to the count of the read_committed messages consumed by spark > streaming. > > Some consumer properties of my spark streaming job are as follows. > > auto.offset.reset=earliest > enable.auto.commit=false > isolation.level=read_committed > > > I also added the following spark streaming configuration. > > sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true"); > sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 60 > * 1000)); > > > My spark streaming is using DirectStream like this. > > JavaInputDStream<ConsumerRecord<Integer, GenericRecord>> stream = > KafkaUtils.createDirectStream( > ssc, > LocationStrategies.PreferConsistent(), > ConsumerStrategies.<Integer, GenericRecord>Subscribe(topics, > kafkaParams) > ); > > > stream.foreachRDD(rdd -> O > > // get offset ranges. > > OffsetRange[] offsetRanges = ((HasOffsetRanges) > rdd.rdd()).offsetRanges(); > > // process something. > > > // commit offset. > ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); > > } > ); > > > > I have tested with a kafka consumer written with raw kafka-clients jar > library without problem that it consumes read_committed messages correctly, > and the count of consumed read_committed messages is equal to the count of > messages sent by kafka producer. > > > And sometimes, I got the following exception. > > Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times, > most recent failure: Lost task 0.0 in stage 324.0 (TID 1674) > (chango-private-1.chango.private executor driver): > java.lang.IllegalArgumentException: requirement failed: Failed to get > records for compacted spark-executor-school-student-group school-student-7 > after polling for 120000 > > at scala.Predef$.require(Predef.scala:281) > > at > org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186) > > at > org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60) > > at > org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59) > > at > org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219) > > > > I have experienced spark streaming job which works fine with kafka > messages which are non-transactional, and I never encountered the > exceptions like above. > It seems that spark streaming for kafka transaction does not handle such > as kafka consumer properties like isolation.level=read_committed and > enable.auto.commit=false correctly. > > Any help appreciated. > > - Kidong. > > > -- > *이기동 * > *Kidong Lee* > > Email: mykid...@gmail.com > Chango: https://cloudcheflabs.github.io/chango-private-docs > Web Site: http://www.cloudchef-labs.com/ > Mobile: +82 10 4981 7297 > <http://www.cloudchef-labs.com/> >