Thank you Mich for your reply.

Actually, I tried to do most of your advice.

When spark.streaming.kafka.allowNonConsecutiveOffsets=false, I got the
following error.

Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most
recent failure: Lost task 0.0 in stage 1.0 (TID 3)
(chango-private-1.chango.private executor driver):
java.lang.IllegalArgumentException: requirement failed: Got wrong record
for spark-executor-school-student-group school-student-7 even after seeking
to offset 11206961 got offset 11206962 instead. If this is a compacted
topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets

at scala.Predef$.require(Predef.scala:281)

at
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:155)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer.get(KafkaDataConsumer.scala:40)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer.get$(KafkaDataConsumer.scala:39)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:219)


And I tried to increase spark.streaming.kafka.consumer.poll.ms to avoid the
exceptions, but it did not help.


- Kidong.




2024년 4월 14일 (일) 오전 4:25, Mich Talebzadeh <mich.talebza...@gmail.com>님이 작성:

> Hi Kidong,
>
> There may be few potential reasons why the message counts from your Kafka
> producer and Spark Streaming consumer might not match, especially with
> transactional messages and read_committed isolation level.
>
> 1) Just ensure that both your Spark Streaming job and the Kafka consumer
> written with raw kafka-clients use the same consumer group. Messages are
> delivered to specific consumer groups, and if they differ, Spark Streaming
> might miss messages consumed by the raw consumer.
> 2) Your Spark Streaming configuration sets *enable.auto.commit=false* and
> uses *commitAsync manually*. However, I noted
> *spark.streaming.kafka.allowNonConsecutiveOffsets=true* which may be
> causing the problem. This setting allows Spark Streaming to read offsets
> that are not strictly increasing, which can happen with transactional
> reads. Generally recommended to set this to* false *for transactional
> reads to ensure Spark Streaming only reads committed messages.
> 3) Missed messages, in transactional messages, Kafka guarantees *delivery
> only after the transaction commits successfully. *There could be a slight
> delay between the producer sending the message and it becoming visible to
> consumers under read_committed isolation level. Spark Streaming could
> potentially miss messages during this window.
> 4) The exception Lost task 0.0 in stage 324.0, suggests a problem fetching
> records for a specific topic partition. Review your code handling of
> potential exceptions during rdd.foreachRDD processing. Ensure retries or
> appropriate error handling if encountering issues with specific partitions.
> 5) Try different configurations for *spark.streaming.kafka.consumer.poll.ms
> <http://spark.streaming.kafka.consumer.poll.ms>* to adjust polling
> frequency and potentially improve visibility into committed messages.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Fri, 12 Apr 2024 at 21:38, Kidong Lee <mykid...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a kafka producer which sends messages transactionally to kafka and
>> spark streaming job which should consume read_committed messages from kafka.
>> But there is a problem for spark streaming to consume read_committed
>> messages.
>> The count of messages sent by kafka producer transactionally is not the
>> same to the count of the read_committed messages consumed by spark
>> streaming.
>>
>> Some consumer properties of my spark streaming job are as follows.
>>
>> auto.offset.reset=earliest
>> enable.auto.commit=false
>> isolation.level=read_committed
>>
>>
>> I also added the following spark streaming configuration.
>>
>> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
>> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 
>> 60 * 1000));
>>
>>
>> My spark streaming is using DirectStream like this.
>>
>> JavaInputDStream<ConsumerRecord<Integer, GenericRecord>> stream =
>>         KafkaUtils.createDirectStream(
>>                 ssc,
>>                 LocationStrategies.PreferConsistent(),
>>                 ConsumerStrategies.<Integer, GenericRecord>Subscribe(topics, 
>> kafkaParams)
>>         );
>>
>>
>> stream.foreachRDD(rdd -> O
>>
>>        // get offset ranges.
>>
>>        OffsetRange[] offsetRanges = ((HasOffsetRanges) 
>> rdd.rdd()).offsetRanges();
>>
>>        // process something.
>>
>>
>>        // commit offset.
>>        ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
>>
>> }
>> );
>>
>>
>>
>> I have tested with a kafka consumer written with raw kafka-clients jar
>> library without problem that it consumes read_committed messages correctly,
>> and the count of consumed read_committed messages is equal to the count of
>> messages sent by kafka producer.
>>
>>
>> And sometimes, I got the following exception.
>>
>> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
>> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
>> (chango-private-1.chango.private executor driver):
>> java.lang.IllegalArgumentException: requirement failed: Failed to get
>> records for compacted spark-executor-school-student-group school-student-7
>> after polling for 120000
>>
>> at scala.Predef$.require(Predef.scala:281)
>>
>> at
>> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)
>>
>> at
>> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)
>>
>> at
>> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)
>>
>> at
>> org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)
>>
>>
>>
>> I have experienced spark streaming job which works fine with kafka
>> messages which are non-transactional, and I never encountered the
>> exceptions like above.
>> It seems that spark streaming for kafka transaction does not handle such
>> as kafka consumer properties like isolation.level=read_committed and
>> enable.auto.commit=false correctly.
>>
>> Any help appreciated.
>>
>> - Kidong.
>>
>>
>> --
>> *이기동 *
>> *Kidong Lee*
>>
>> Email: mykid...@gmail.com
>> Chango: https://cloudcheflabs.github.io/chango-private-docs
>> Web Site: http://www.cloudchef-labs.com/
>> Mobile: +82 10 4981 7297
>> <http://www.cloudchef-labs.com/>
>>
>

-- 
*이기동 *
*Kidong Lee*

Email: mykid...@gmail.com
Chango: https://cloudcheflabs.github.io/chango-private-docs
Web Site: http://www.cloudchef-labs.com/
Mobile: +82 10 4981 7297
<http://www.cloudchef-labs.com/>

Reply via email to