Interesting

My concern is infinite Loop in* foreachRDD*: The *while(true)* loop within
foreachRDD creates an infinite loop within each Spark executor. This might
not be the most efficient approach, especially since offsets are committed
asynchronously.?

HTH

Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Sun, 14 Apr 2024 at 13:40, Kidong Lee <mykid...@gmail.com> wrote:

>
> Because spark streaming for kafk transaction does not work correctly to
> suit my need, I moved to another approach using raw kafka consumer which
> handles read_committed messages from kafka correctly.
>
> My codes look like the following.
>
> JavaDStream<String> stream = ssc.receiverStream(new CustomReceiver()); // 
> CustomReceiver does nothing special except awaking foreach task.
>
> stream.foreachRDD(rdd -> {
>
>   KafkaConsumer<Integer, GenericRecord> consumer = new 
> KafkaConsumer<>(consumerProperties);
>
>   consumer.subscribe(Arrays.asList(topic));
>
>   while(true){
>
>     ConsumerRecords<Integer, GenericRecord> records =
>             consumer.poll(java.time.Duration.ofMillis(intervalMs));
>
>     Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
>
>     List<String> someList = new ArrayList<>();
>
>     for (ConsumerRecord<Integer, GenericRecord> consumerRecord : records) {
>
>       // add something to list.
>
>       // put offset to offsetMap.
>
>     }
>
>     // process someList.
>
>         // commit offset.
>
>     consumer.commitAsync(offsetMap, null);
>
>   }
>
> });
>
>
> In addition, I increased max.poll.records to 100000.
>
> Even if this raw kafka consumer approach is not so scalable, it consumes
> read_committed messages from kafka correctly and is enough for me at the
> moment.
>
> - Kidong.
>
>
>
> 2024년 4월 12일 (금) 오후 9:19, Kidong Lee <mykid...@gmail.com>님이 작성:
>
>> Hi,
>>
>> I have a kafka producer which sends messages transactionally to kafka and
>> spark streaming job which should consume read_committed messages from kafka.
>> But there is a problem for spark streaming to consume read_committed
>> messages.
>> The count of messages sent by kafka producer transactionally is not the
>> same to the count of the read_committed messages consumed by spark
>> streaming.
>>
>> Some consumer properties of my spark streaming job are as follows.
>>
>> auto.offset.reset=earliest
>> enable.auto.commit=false
>> isolation.level=read_committed
>>
>>
>> I also added the following spark streaming configuration.
>>
>> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
>> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 
>> 60 * 1000));
>>
>>
>> My spark streaming is using DirectStream like this.
>>
>> JavaInputDStream<ConsumerRecord<Integer, GenericRecord>> stream =
>>         KafkaUtils.createDirectStream(
>>                 ssc,
>>                 LocationStrategies.PreferConsistent(),
>>                 ConsumerStrategies.<Integer, GenericRecord>Subscribe(topics, 
>> kafkaParams)
>>         );
>>
>>
>> stream.foreachRDD(rdd -> O
>>
>>        // get offset ranges.
>>
>>        OffsetRange[] offsetRanges = ((HasOffsetRanges) 
>> rdd.rdd()).offsetRanges();
>>
>>        // process something.
>>
>>
>>        // commit offset.
>>        ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
>>
>> }
>> );
>>
>>
>>
>> I have tested with a kafka consumer written with raw kafka-clients jar
>> library without problem that it consumes read_committed messages correctly,
>> and the count of consumed read_committed messages is equal to the count of
>> messages sent by kafka producer.
>>
>>
>> And sometimes, I got the following exception.
>>
>> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
>> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
>> (chango-private-1.chango.private executor driver):
>> java.lang.IllegalArgumentException: requirement failed: Failed to get
>> records for compacted spark-executor-school-student-group school-student-7
>> after polling for 120000
>>
>> at scala.Predef$.require(Predef.scala:281)
>>
>> at
>> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)
>>
>> at
>> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)
>>
>> at
>> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)
>>
>> at
>> org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)
>>
>>
>>
>> I have experienced spark streaming job which works fine with kafka
>> messages which are non-transactional, and I never encountered the
>> exceptions like above.
>> It seems that spark streaming for kafka transaction does not handle such
>> as kafka consumer properties like isolation.level=read_committed and
>> enable.auto.commit=false correctly.
>>
>> Any help appreciated.
>>
>> - Kidong.
>>
>>
>> --
>> *이기동 *
>> *Kidong Lee*
>>
>> Email: mykid...@gmail.com
>> Chango: https://cloudcheflabs.github.io/chango-private-docs
>> Web Site: http://www.cloudchef-labs.com/
>> Mobile: +82 10 4981 7297
>> <http://www.cloudchef-labs.com/>
>>
>
>
> --
> *이기동 *
> *Kidong Lee*
>
> Email: mykid...@gmail.com
> Chango: https://cloudcheflabs.github.io/chango-private-docs
> Web Site: http://www.cloudchef-labs.com/
> Mobile: +82 10 4981 7297
> <http://www.cloudchef-labs.com/>
>

Reply via email to