Interesting My concern is infinite Loop in* foreachRDD*: The *while(true)* loop within foreachRDD creates an infinite loop within each Spark executor. This might not be the most efficient approach, especially since offsets are committed asynchronously.?
HTH Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Sun, 14 Apr 2024 at 13:40, Kidong Lee <mykid...@gmail.com> wrote: > > Because spark streaming for kafk transaction does not work correctly to > suit my need, I moved to another approach using raw kafka consumer which > handles read_committed messages from kafka correctly. > > My codes look like the following. > > JavaDStream<String> stream = ssc.receiverStream(new CustomReceiver()); // > CustomReceiver does nothing special except awaking foreach task. > > stream.foreachRDD(rdd -> { > > KafkaConsumer<Integer, GenericRecord> consumer = new > KafkaConsumer<>(consumerProperties); > > consumer.subscribe(Arrays.asList(topic)); > > while(true){ > > ConsumerRecords<Integer, GenericRecord> records = > consumer.poll(java.time.Duration.ofMillis(intervalMs)); > > Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(); > > List<String> someList = new ArrayList<>(); > > for (ConsumerRecord<Integer, GenericRecord> consumerRecord : records) { > > // add something to list. > > // put offset to offsetMap. > > } > > // process someList. > > // commit offset. > > consumer.commitAsync(offsetMap, null); > > } > > }); > > > In addition, I increased max.poll.records to 100000. > > Even if this raw kafka consumer approach is not so scalable, it consumes > read_committed messages from kafka correctly and is enough for me at the > moment. > > - Kidong. > > > > 2024년 4월 12일 (금) 오후 9:19, Kidong Lee <mykid...@gmail.com>님이 작성: > >> Hi, >> >> I have a kafka producer which sends messages transactionally to kafka and >> spark streaming job which should consume read_committed messages from kafka. >> But there is a problem for spark streaming to consume read_committed >> messages. >> The count of messages sent by kafka producer transactionally is not the >> same to the count of the read_committed messages consumed by spark >> streaming. >> >> Some consumer properties of my spark streaming job are as follows. >> >> auto.offset.reset=earliest >> enable.auto.commit=false >> isolation.level=read_committed >> >> >> I also added the following spark streaming configuration. >> >> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true"); >> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * >> 60 * 1000)); >> >> >> My spark streaming is using DirectStream like this. >> >> JavaInputDStream<ConsumerRecord<Integer, GenericRecord>> stream = >> KafkaUtils.createDirectStream( >> ssc, >> LocationStrategies.PreferConsistent(), >> ConsumerStrategies.<Integer, GenericRecord>Subscribe(topics, >> kafkaParams) >> ); >> >> >> stream.foreachRDD(rdd -> O >> >> // get offset ranges. >> >> OffsetRange[] offsetRanges = ((HasOffsetRanges) >> rdd.rdd()).offsetRanges(); >> >> // process something. >> >> >> // commit offset. >> ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); >> >> } >> ); >> >> >> >> I have tested with a kafka consumer written with raw kafka-clients jar >> library without problem that it consumes read_committed messages correctly, >> and the count of consumed read_committed messages is equal to the count of >> messages sent by kafka producer. >> >> >> And sometimes, I got the following exception. >> >> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times, >> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674) >> (chango-private-1.chango.private executor driver): >> java.lang.IllegalArgumentException: requirement failed: Failed to get >> records for compacted spark-executor-school-student-group school-student-7 >> after polling for 120000 >> >> at scala.Predef$.require(Predef.scala:281) >> >> at >> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186) >> >> at >> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60) >> >> at >> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59) >> >> at >> org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219) >> >> >> >> I have experienced spark streaming job which works fine with kafka >> messages which are non-transactional, and I never encountered the >> exceptions like above. >> It seems that spark streaming for kafka transaction does not handle such >> as kafka consumer properties like isolation.level=read_committed and >> enable.auto.commit=false correctly. >> >> Any help appreciated. >> >> - Kidong. >> >> >> -- >> *이기동 * >> *Kidong Lee* >> >> Email: mykid...@gmail.com >> Chango: https://cloudcheflabs.github.io/chango-private-docs >> Web Site: http://www.cloudchef-labs.com/ >> Mobile: +82 10 4981 7297 >> <http://www.cloudchef-labs.com/> >> > > > -- > *이기동 * > *Kidong Lee* > > Email: mykid...@gmail.com > Chango: https://cloudcheflabs.github.io/chango-private-docs > Web Site: http://www.cloudchef-labs.com/ > Mobile: +82 10 4981 7297 > <http://www.cloudchef-labs.com/> >