Hi Kidong,

There may be few potential reasons why the message counts from your Kafka
producer and Spark Streaming consumer might not match, especially with
transactional messages and read_committed isolation level.

1) Just ensure that both your Spark Streaming job and the Kafka consumer
written with raw kafka-clients use the same consumer group. Messages are
delivered to specific consumer groups, and if they differ, Spark Streaming
might miss messages consumed by the raw consumer.
2) Your Spark Streaming configuration sets *enable.auto.commit=false* and
uses *commitAsync manually*. However, I noted
*spark.streaming.kafka.allowNonConsecutiveOffsets=true* which may be
causing the problem. This setting allows Spark Streaming to read offsets
that are not strictly increasing, which can happen with transactional
reads. Generally recommended to set this to* false *for transactional reads
to ensure Spark Streaming only reads committed messages.
3) Missed messages, in transactional messages, Kafka guarantees *delivery
only after the transaction commits successfully. *There could be a slight
delay between the producer sending the message and it becoming visible to
consumers under read_committed isolation level. Spark Streaming could
potentially miss messages during this window.
4) The exception Lost task 0.0 in stage 324.0, suggests a problem fetching
records for a specific topic partition. Review your code handling of
potential exceptions during rdd.foreachRDD processing. Ensure retries or
appropriate error handling if encountering issues with specific partitions.
5) Try different configurations for *spark.streaming.kafka.consumer.poll.ms
<http://spark.streaming.kafka.consumer.poll.ms>* to adjust polling
frequency and potentially improve visibility into committed messages.


Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
United Kingdom

   view my Linkedin profile


*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".

On Fri, 12 Apr 2024 at 21:38, Kidong Lee <mykid...@gmail.com> wrote:

> Hi,
> I have a kafka producer which sends messages transactionally to kafka and
> spark streaming job which should consume read_committed messages from kafka.
> But there is a problem for spark streaming to consume read_committed
> messages.
> The count of messages sent by kafka producer transactionally is not the
> same to the count of the read_committed messages consumed by spark
> streaming.
> Some consumer properties of my spark streaming job are as follows.
> auto.offset.reset=earliest
> enable.auto.commit=false
> isolation.level=read_committed
> I also added the following spark streaming configuration.
> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 60 
> * 1000));
> My spark streaming is using DirectStream like this.
> JavaInputDStream<ConsumerRecord<Integer, GenericRecord>> stream =
>         KafkaUtils.createDirectStream(
>                 ssc,
>                 LocationStrategies.PreferConsistent(),
>                 ConsumerStrategies.<Integer, GenericRecord>Subscribe(topics, 
> kafkaParams)
>         );
> stream.foreachRDD(rdd -> O
>        // get offset ranges.
>        OffsetRange[] offsetRanges = ((HasOffsetRanges) 
> rdd.rdd()).offsetRanges();
>        // process something.
>        // commit offset.
>        ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
> }
> );
> I have tested with a kafka consumer written with raw kafka-clients jar
> library without problem that it consumes read_committed messages correctly,
> and the count of consumed read_committed messages is equal to the count of
> messages sent by kafka producer.
> And sometimes, I got the following exception.
> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
> (chango-private-1.chango.private executor driver):
> java.lang.IllegalArgumentException: requirement failed: Failed to get
> records for compacted spark-executor-school-student-group school-student-7
> after polling for 120000
> at scala.Predef$.require(Predef.scala:281)
> at
> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)
> I have experienced spark streaming job which works fine with kafka
> messages which are non-transactional, and I never encountered the
> exceptions like above.
> It seems that spark streaming for kafka transaction does not handle such
> as kafka consumer properties like isolation.level=read_committed and
> enable.auto.commit=false correctly.
> Any help appreciated.
> - Kidong.
> --
> *이기동 *
> *Kidong Lee*
> Email: mykid...@gmail.com
> Chango: https://cloudcheflabs.github.io/chango-private-docs
> Web Site: http://www.cloudchef-labs.com/
> Mobile: +82 10 4981 7297
> <http://www.cloudchef-labs.com/>

Reply via email to