Because spark streaming for kafk transaction does not work correctly to
suit my need, I moved to another approach using raw kafka consumer which
handles read_committed messages from kafka correctly.

My codes look like the following.

JavaDStream<String> stream = ssc.receiverStream(new CustomReceiver());
// CustomReceiver does nothing special except awaking foreach task.

stream.foreachRDD(rdd -> {

  KafkaConsumer<Integer, GenericRecord> consumer = new
KafkaConsumer<>(consumerProperties);

  consumer.subscribe(Arrays.asList(topic));

  while(true){

    ConsumerRecords<Integer, GenericRecord> records =
            consumer.poll(java.time.Duration.ofMillis(intervalMs));

    Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();

    List<String> someList = new ArrayList<>();

    for (ConsumerRecord<Integer, GenericRecord> consumerRecord : records) {

      // add something to list.

      // put offset to offsetMap.

    }

    // process someList.

        // commit offset.

    consumer.commitAsync(offsetMap, null);

  }

});


In addition, I increased max.poll.records to 100000.

Even if this raw kafka consumer approach is not so scalable, it consumes
read_committed messages from kafka correctly and is enough for me at the
moment.

- Kidong.



2024년 4월 12일 (금) 오후 9:19, Kidong Lee <mykid...@gmail.com>님이 작성:

> Hi,
>
> I have a kafka producer which sends messages transactionally to kafka and
> spark streaming job which should consume read_committed messages from kafka.
> But there is a problem for spark streaming to consume read_committed
> messages.
> The count of messages sent by kafka producer transactionally is not the
> same to the count of the read_committed messages consumed by spark
> streaming.
>
> Some consumer properties of my spark streaming job are as follows.
>
> auto.offset.reset=earliest
> enable.auto.commit=false
> isolation.level=read_committed
>
>
> I also added the following spark streaming configuration.
>
> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 60 
> * 1000));
>
>
> My spark streaming is using DirectStream like this.
>
> JavaInputDStream<ConsumerRecord<Integer, GenericRecord>> stream =
>         KafkaUtils.createDirectStream(
>                 ssc,
>                 LocationStrategies.PreferConsistent(),
>                 ConsumerStrategies.<Integer, GenericRecord>Subscribe(topics, 
> kafkaParams)
>         );
>
>
> stream.foreachRDD(rdd -> O
>
>        // get offset ranges.
>
>        OffsetRange[] offsetRanges = ((HasOffsetRanges) 
> rdd.rdd()).offsetRanges();
>
>        // process something.
>
>
>        // commit offset.
>        ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
>
> }
> );
>
>
>
> I have tested with a kafka consumer written with raw kafka-clients jar
> library without problem that it consumes read_committed messages correctly,
> and the count of consumed read_committed messages is equal to the count of
> messages sent by kafka producer.
>
>
> And sometimes, I got the following exception.
>
> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
> (chango-private-1.chango.private executor driver):
> java.lang.IllegalArgumentException: requirement failed: Failed to get
> records for compacted spark-executor-school-student-group school-student-7
> after polling for 120000
>
> at scala.Predef$.require(Predef.scala:281)
>
> at
> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)
>
>
>
> I have experienced spark streaming job which works fine with kafka
> messages which are non-transactional, and I never encountered the
> exceptions like above.
> It seems that spark streaming for kafka transaction does not handle such
> as kafka consumer properties like isolation.level=read_committed and
> enable.auto.commit=false correctly.
>
> Any help appreciated.
>
> - Kidong.
>
>
> --
> *이기동 *
> *Kidong Lee*
>
> Email: mykid...@gmail.com
> Chango: https://cloudcheflabs.github.io/chango-private-docs
> Web Site: http://www.cloudchef-labs.com/
> Mobile: +82 10 4981 7297
> <http://www.cloudchef-labs.com/>
>


-- 
*이기동 *
*Kidong Lee*

Email: mykid...@gmail.com
Chango: https://cloudcheflabs.github.io/chango-private-docs
Web Site: http://www.cloudchef-labs.com/
Mobile: +82 10 4981 7297
<http://www.cloudchef-labs.com/>

Reply via email to