Because spark streaming for kafk transaction does not work correctly to suit my need, I moved to another approach using raw kafka consumer which handles read_committed messages from kafka correctly.
My codes look like the following. JavaDStream<String> stream = ssc.receiverStream(new CustomReceiver()); // CustomReceiver does nothing special except awaking foreach task. stream.foreachRDD(rdd -> { KafkaConsumer<Integer, GenericRecord> consumer = new KafkaConsumer<>(consumerProperties); consumer.subscribe(Arrays.asList(topic)); while(true){ ConsumerRecords<Integer, GenericRecord> records = consumer.poll(java.time.Duration.ofMillis(intervalMs)); Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(); List<String> someList = new ArrayList<>(); for (ConsumerRecord<Integer, GenericRecord> consumerRecord : records) { // add something to list. // put offset to offsetMap. } // process someList. // commit offset. consumer.commitAsync(offsetMap, null); } }); In addition, I increased max.poll.records to 100000. Even if this raw kafka consumer approach is not so scalable, it consumes read_committed messages from kafka correctly and is enough for me at the moment. - Kidong. 2024년 4월 12일 (금) 오후 9:19, Kidong Lee <mykid...@gmail.com>님이 작성: > Hi, > > I have a kafka producer which sends messages transactionally to kafka and > spark streaming job which should consume read_committed messages from kafka. > But there is a problem for spark streaming to consume read_committed > messages. > The count of messages sent by kafka producer transactionally is not the > same to the count of the read_committed messages consumed by spark > streaming. > > Some consumer properties of my spark streaming job are as follows. > > auto.offset.reset=earliest > enable.auto.commit=false > isolation.level=read_committed > > > I also added the following spark streaming configuration. > > sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true"); > sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 60 > * 1000)); > > > My spark streaming is using DirectStream like this. > > JavaInputDStream<ConsumerRecord<Integer, GenericRecord>> stream = > KafkaUtils.createDirectStream( > ssc, > LocationStrategies.PreferConsistent(), > ConsumerStrategies.<Integer, GenericRecord>Subscribe(topics, > kafkaParams) > ); > > > stream.foreachRDD(rdd -> O > > // get offset ranges. > > OffsetRange[] offsetRanges = ((HasOffsetRanges) > rdd.rdd()).offsetRanges(); > > // process something. > > > // commit offset. > ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); > > } > ); > > > > I have tested with a kafka consumer written with raw kafka-clients jar > library without problem that it consumes read_committed messages correctly, > and the count of consumed read_committed messages is equal to the count of > messages sent by kafka producer. > > > And sometimes, I got the following exception. > > Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times, > most recent failure: Lost task 0.0 in stage 324.0 (TID 1674) > (chango-private-1.chango.private executor driver): > java.lang.IllegalArgumentException: requirement failed: Failed to get > records for compacted spark-executor-school-student-group school-student-7 > after polling for 120000 > > at scala.Predef$.require(Predef.scala:281) > > at > org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186) > > at > org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60) > > at > org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59) > > at > org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219) > > > > I have experienced spark streaming job which works fine with kafka > messages which are non-transactional, and I never encountered the > exceptions like above. > It seems that spark streaming for kafka transaction does not handle such > as kafka consumer properties like isolation.level=read_committed and > enable.auto.commit=false correctly. > > Any help appreciated. > > - Kidong. > > > -- > *이기동 * > *Kidong Lee* > > Email: mykid...@gmail.com > Chango: https://cloudcheflabs.github.io/chango-private-docs > Web Site: http://www.cloudchef-labs.com/ > Mobile: +82 10 4981 7297 > <http://www.cloudchef-labs.com/> > -- *이기동 * *Kidong Lee* Email: mykid...@gmail.com Chango: https://cloudcheflabs.github.io/chango-private-docs Web Site: http://www.cloudchef-labs.com/ Mobile: +82 10 4981 7297 <http://www.cloudchef-labs.com/>