Thanks, Mich for your reply.

I agree, it is not so scalable and efficient. But it works correctly for
kafka transaction, and there is no problem with committing offset to kafka
async for now.

I try to tell you some more details about my streaming job.
CustomReceiver does not receive anything from outside and just forward
notice message to run an executor in which kafka consumer will be run.
See my CustomReceiver.

private static class CustomReceiver extends Receiver<String> {

    public CustomReceiver() {
        super(StorageLevel.MEMORY_AND_DISK_2());
    }

    @Override
    public void onStart() {
        new Thread(this::receive).start();
    }

    private void receive() {
        String input = "receiver input " + UUID.randomUUID().toString();
        store(input);
    }

    @Override
    public void onStop() {

    }
}


Actually, just one Kafka consumer will be run which consumes committed
messages from kafka directly(, which is not so scalable, I think.).
But the main point of this approach which I need is that spark
session needs to be used to save rdd(parallelized consumed messages) to
iceberg table.
Consumed messages will be converted to spark rdd which will be saved to
iceberg table using spark session.

I have tested this spark streaming job with transactional producers which
send several millions of messages. Correctly consumed and saved to iceberg
tables correctly.

- Kidong.



2024년 4월 14일 (일) 오후 11:05, Mich Talebzadeh <mich.talebza...@gmail.com>님이 작성:

> Interesting
>
> My concern is infinite Loop in* foreachRDD*: The *while(true)* loop
> within foreachRDD creates an infinite loop within each Spark executor. This
> might not be the most efficient approach, especially since offsets are
> committed asynchronously.?
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Sun, 14 Apr 2024 at 13:40, Kidong Lee <mykid...@gmail.com> wrote:
>
>>
>> Because spark streaming for kafk transaction does not work correctly to
>> suit my need, I moved to another approach using raw kafka consumer which
>> handles read_committed messages from kafka correctly.
>>
>> My codes look like the following.
>>
>> JavaDStream<String> stream = ssc.receiverStream(new CustomReceiver()); // 
>> CustomReceiver does nothing special except awaking foreach task.
>>
>> stream.foreachRDD(rdd -> {
>>
>>   KafkaConsumer<Integer, GenericRecord> consumer = new 
>> KafkaConsumer<>(consumerProperties);
>>
>>   consumer.subscribe(Arrays.asList(topic));
>>
>>   while(true){
>>
>>     ConsumerRecords<Integer, GenericRecord> records =
>>             consumer.poll(java.time.Duration.ofMillis(intervalMs));
>>
>>     Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
>>
>>     List<String> someList = new ArrayList<>();
>>
>>     for (ConsumerRecord<Integer, GenericRecord> consumerRecord : records) {
>>
>>       // add something to list.
>>
>>       // put offset to offsetMap.
>>
>>     }
>>
>>     // process someList.
>>
>>         // commit offset.
>>
>>     consumer.commitAsync(offsetMap, null);
>>
>>   }
>>
>> });
>>
>>
>> In addition, I increased max.poll.records to 100000.
>>
>> Even if this raw kafka consumer approach is not so scalable, it consumes
>> read_committed messages from kafka correctly and is enough for me at the
>> moment.
>>
>> - Kidong.
>>
>>
>>
>> 2024년 4월 12일 (금) 오후 9:19, Kidong Lee <mykid...@gmail.com>님이 작성:
>>
>>> Hi,
>>>
>>> I have a kafka producer which sends messages transactionally to kafka
>>> and spark streaming job which should consume read_committed messages from
>>> kafka.
>>> But there is a problem for spark streaming to consume read_committed
>>> messages.
>>> The count of messages sent by kafka producer transactionally is not the
>>> same to the count of the read_committed messages consumed by spark
>>> streaming.
>>>
>>> Some consumer properties of my spark streaming job are as follows.
>>>
>>> auto.offset.reset=earliest
>>> enable.auto.commit=false
>>> isolation.level=read_committed
>>>
>>>
>>> I also added the following spark streaming configuration.
>>>
>>> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
>>> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 
>>> 60 * 1000));
>>>
>>>
>>> My spark streaming is using DirectStream like this.
>>>
>>> JavaInputDStream<ConsumerRecord<Integer, GenericRecord>> stream =
>>>         KafkaUtils.createDirectStream(
>>>                 ssc,
>>>                 LocationStrategies.PreferConsistent(),
>>>                 ConsumerStrategies.<Integer, 
>>> GenericRecord>Subscribe(topics, kafkaParams)
>>>         );
>>>
>>>
>>> stream.foreachRDD(rdd -> O
>>>
>>>        // get offset ranges.
>>>
>>>        OffsetRange[] offsetRanges = ((HasOffsetRanges) 
>>> rdd.rdd()).offsetRanges();
>>>
>>>        // process something.
>>>
>>>
>>>        // commit offset.
>>>        ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
>>>
>>> }
>>> );
>>>
>>>
>>>
>>> I have tested with a kafka consumer written with raw kafka-clients jar
>>> library without problem that it consumes read_committed messages correctly,
>>> and the count of consumed read_committed messages is equal to the count of
>>> messages sent by kafka producer.
>>>
>>>
>>> And sometimes, I got the following exception.
>>>
>>> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
>>> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
>>> (chango-private-1.chango.private executor driver):
>>> java.lang.IllegalArgumentException: requirement failed: Failed to get
>>> records for compacted spark-executor-school-student-group school-student-7
>>> after polling for 120000
>>>
>>> at scala.Predef$.require(Predef.scala:281)
>>>
>>> at
>>> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)
>>>
>>> at
>>> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)
>>>
>>> at
>>> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)
>>>
>>> at
>>> org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)
>>>
>>>
>>>
>>> I have experienced spark streaming job which works fine with kafka
>>> messages which are non-transactional, and I never encountered the
>>> exceptions like above.
>>> It seems that spark streaming for kafka transaction does not handle such
>>> as kafka consumer properties like isolation.level=read_committed and
>>> enable.auto.commit=false correctly.
>>>
>>> Any help appreciated.
>>>
>>> - Kidong.
>>>
>>>
>>> --
>>> *이기동 *
>>> *Kidong Lee*
>>>
>>> Email: mykid...@gmail.com
>>> Chango: https://cloudcheflabs.github.io/chango-private-docs
>>> Web Site: http://www.cloudchef-labs.com/
>>> Mobile: +82 10 4981 7297
>>> <http://www.cloudchef-labs.com/>
>>>
>>
>>
>> --
>> *이기동 *
>> *Kidong Lee*
>>
>> Email: mykid...@gmail.com
>> Chango: https://cloudcheflabs.github.io/chango-private-docs
>> Web Site: http://www.cloudchef-labs.com/
>> Mobile: +82 10 4981 7297
>> <http://www.cloudchef-labs.com/>
>>
>

-- 
*이기동 *
*Kidong Lee*

Email: mykid...@gmail.com
Chango: https://cloudcheflabs.github.io/chango-private-docs
Web Site: http://www.cloudchef-labs.com/
Mobile: +82 10 4981 7297
<http://www.cloudchef-labs.com/>

Reply via email to