Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-14 Thread Kidong Lee
Thanks, Mich for your reply.

I agree, it is not so scalable and efficient. But it works correctly for
kafka transaction, and there is no problem with committing offset to kafka
async for now.

I try to tell you some more details about my streaming job.
CustomReceiver does not receive anything from outside and just forward
notice message to run an executor in which kafka consumer will be run.
See my CustomReceiver.

private static class CustomReceiver extends Receiver {

public CustomReceiver() {
super(StorageLevel.MEMORY_AND_DISK_2());
}

@Override
public void onStart() {
new Thread(this::receive).start();
}

private void receive() {
String input = "receiver input " + UUID.randomUUID().toString();
store(input);
}

@Override
public void onStop() {

}
}


Actually, just one Kafka consumer will be run which consumes committed
messages from kafka directly(, which is not so scalable, I think.).
But the main point of this approach which I need is that spark
session needs to be used to save rdd(parallelized consumed messages) to
iceberg table.
Consumed messages will be converted to spark rdd which will be saved to
iceberg table using spark session.

I have tested this spark streaming job with transactional producers which
send several millions of messages. Correctly consumed and saved to iceberg
tables correctly.

- Kidong.



2024년 4월 14일 (일) 오후 11:05, Mich Talebzadeh 님이 작성:

> Interesting
>
> My concern is infinite Loop in* foreachRDD*: The *while(true)* loop
> within foreachRDD creates an infinite loop within each Spark executor. This
> might not be the most efficient approach, especially since offsets are
> committed asynchronously.?
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Sun, 14 Apr 2024 at 13:40, Kidong Lee  wrote:
>
>>
>> Because spark streaming for kafk transaction does not work correctly to
>> suit my need, I moved to another approach using raw kafka consumer which
>> handles read_committed messages from kafka correctly.
>>
>> My codes look like the following.
>>
>> JavaDStream stream = ssc.receiverStream(new CustomReceiver()); // 
>> CustomReceiver does nothing special except awaking foreach task.
>>
>> stream.foreachRDD(rdd -> {
>>
>>   KafkaConsumer consumer = new 
>> KafkaConsumer<>(consumerProperties);
>>
>>   consumer.subscribe(Arrays.asList(topic));
>>
>>   while(true){
>>
>> ConsumerRecords records =
>> consumer.poll(java.time.Duration.ofMillis(intervalMs));
>>
>> Map offsetMap = new HashMap<>();
>>
>> List someList = new ArrayList<>();
>>
>> for (ConsumerRecord consumerRecord : records) {
>>
>>   // add something to list.
>>
>>   // put offset to offsetMap.
>>
>> }
>>
>> // process someList.
>>
>> // commit offset.
>>
>> consumer.commitAsync(offsetMap, null);
>>
>>   }
>>
>> });
>>
>>
>> In addition, I increased max.poll.records to 10.
>>
>> Even if this raw kafka consumer approach is not so scalable, it consumes
>> read_committed messages from kafka correctly and is enough for me at the
>> moment.
>>
>> - Kidong.
>>
>>
>>
>> 2024년 4월 12일 (금) 오후 9:19, Kidong Lee 님이 작성:
>>
>>> Hi,
>>>
>>> I have a kafka producer which sends messages transactionally to kafka
>>> and spark streaming job which should consume read_committed messages from
>>> kafka.
>>> But there is a problem for spark streaming to consume read_committed
>>> messages.
>>> The count of messages sent by kafka producer transactionally is not the
>>> same to the count of the read_committed messages consumed by spark
>>> streaming.
>>>
>>> Some consumer properties of my spark streaming job are as follows.
>>>
>>> auto.offset.reset=earliest
>>> enable.auto.commit=false
>>> isolation.level=read_committed
>>>
>>>
>>> I also added the following spark streaming configuration.
>>>
>>> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
>>> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 
>>> 60 * 1000));
>>>
>>>
>>> My spark streaming is using DirectStream like this.
>>>
>>> JavaInputDStream> stream =
>>> KafkaUtils.createDirectStream(
>>> ssc,
>>> LocationStrategies.PreferConsistent(),
>>> ConsumerStrategies.>> GenericRecord>Subscribe(topics, kafkaParams)
>>> );
>>>
>>>
>>> stream.foreachRDD(rdd -> O
>>>
>>> 

Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.

2024-04-14 Thread Mich Talebzadeh
Interesting

My concern is infinite Loop in* foreachRDD*: The *while(true)* loop within
foreachRDD creates an infinite loop within each Spark executor. This might
not be the most efficient approach, especially since offsets are committed
asynchronously.?

HTH

Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Sun, 14 Apr 2024 at 13:40, Kidong Lee  wrote:

>
> Because spark streaming for kafk transaction does not work correctly to
> suit my need, I moved to another approach using raw kafka consumer which
> handles read_committed messages from kafka correctly.
>
> My codes look like the following.
>
> JavaDStream stream = ssc.receiverStream(new CustomReceiver()); // 
> CustomReceiver does nothing special except awaking foreach task.
>
> stream.foreachRDD(rdd -> {
>
>   KafkaConsumer consumer = new 
> KafkaConsumer<>(consumerProperties);
>
>   consumer.subscribe(Arrays.asList(topic));
>
>   while(true){
>
> ConsumerRecords records =
> consumer.poll(java.time.Duration.ofMillis(intervalMs));
>
> Map offsetMap = new HashMap<>();
>
> List someList = new ArrayList<>();
>
> for (ConsumerRecord consumerRecord : records) {
>
>   // add something to list.
>
>   // put offset to offsetMap.
>
> }
>
> // process someList.
>
> // commit offset.
>
> consumer.commitAsync(offsetMap, null);
>
>   }
>
> });
>
>
> In addition, I increased max.poll.records to 10.
>
> Even if this raw kafka consumer approach is not so scalable, it consumes
> read_committed messages from kafka correctly and is enough for me at the
> moment.
>
> - Kidong.
>
>
>
> 2024년 4월 12일 (금) 오후 9:19, Kidong Lee 님이 작성:
>
>> Hi,
>>
>> I have a kafka producer which sends messages transactionally to kafka and
>> spark streaming job which should consume read_committed messages from kafka.
>> But there is a problem for spark streaming to consume read_committed
>> messages.
>> The count of messages sent by kafka producer transactionally is not the
>> same to the count of the read_committed messages consumed by spark
>> streaming.
>>
>> Some consumer properties of my spark streaming job are as follows.
>>
>> auto.offset.reset=earliest
>> enable.auto.commit=false
>> isolation.level=read_committed
>>
>>
>> I also added the following spark streaming configuration.
>>
>> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
>> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 
>> 60 * 1000));
>>
>>
>> My spark streaming is using DirectStream like this.
>>
>> JavaInputDStream> stream =
>> KafkaUtils.createDirectStream(
>> ssc,
>> LocationStrategies.PreferConsistent(),
>> ConsumerStrategies.Subscribe(topics, 
>> kafkaParams)
>> );
>>
>>
>> stream.foreachRDD(rdd -> O
>>
>>// get offset ranges.
>>
>>OffsetRange[] offsetRanges = ((HasOffsetRanges) 
>> rdd.rdd()).offsetRanges();
>>
>>// process something.
>>
>>
>>// commit offset.
>>((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
>>
>> }
>> );
>>
>>
>>
>> I have tested with a kafka consumer written with raw kafka-clients jar
>> library without problem that it consumes read_committed messages correctly,
>> and the count of consumed read_committed messages is equal to the count of
>> messages sent by kafka producer.
>>
>>
>> And sometimes, I got the following exception.
>>
>> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
>> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
>> (chango-private-1.chango.private executor driver):
>> java.lang.IllegalArgumentException: requirement failed: Failed to get
>> records for compacted spark-executor-school-student-group school-student-7
>> after polling for 12
>>
>> at scala.Predef$.require(Predef.scala:281)
>>
>> at
>> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)
>>
>> at
>> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)
>>
>> at
>> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)
>>
>> at
>> org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)
>>
>>
>>
>> I have experienced spark streaming job which works fine with kafka
>> messages which are non-transactional, and I