Because spark streaming for kafk transaction does not work correctly to
suit my need, I moved to another approach using raw kafka consumer which
handles read_committed messages from kafka correctly.
My codes look like the following.
JavaDStream<String> stream = ssc.receiverStream(new CustomReceiver());
// CustomReceiver does nothing special except awaking foreach task.
stream.foreachRDD(rdd -> {
KafkaConsumer<Integer, GenericRecord> consumer = new
KafkaConsumer<>(consumerProperties);
consumer.subscribe(Arrays.asList(topic));
while(true){
ConsumerRecords<Integer, GenericRecord> records =
consumer.poll(java.time.Duration.ofMillis(intervalMs));
Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
List<String> someList = new ArrayList<>();
for (ConsumerRecord<Integer, GenericRecord> consumerRecord : records) {
// add something to list.
// put offset to offsetMap.
}
// process someList.
// commit offset.
consumer.commitAsync(offsetMap, null);
}
});
In addition, I increased max.poll.records to 100000.
Even if this raw kafka consumer approach is not so scalable, it consumes
read_committed messages from kafka correctly and is enough for me at the
moment.
- Kidong.
2024년 4월 12일 (금) 오후 9:19, Kidong Lee <[email protected]>님이 작성:
> Hi,
>
> I have a kafka producer which sends messages transactionally to kafka and
> spark streaming job which should consume read_committed messages from kafka.
> But there is a problem for spark streaming to consume read_committed
> messages.
> The count of messages sent by kafka producer transactionally is not the
> same to the count of the read_committed messages consumed by spark
> streaming.
>
> Some consumer properties of my spark streaming job are as follows.
>
> auto.offset.reset=earliest
> enable.auto.commit=false
> isolation.level=read_committed
>
>
> I also added the following spark streaming configuration.
>
> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 60
> * 1000));
>
>
> My spark streaming is using DirectStream like this.
>
> JavaInputDStream<ConsumerRecord<Integer, GenericRecord>> stream =
> KafkaUtils.createDirectStream(
> ssc,
> LocationStrategies.PreferConsistent(),
> ConsumerStrategies.<Integer, GenericRecord>Subscribe(topics,
> kafkaParams)
> );
>
>
> stream.foreachRDD(rdd -> O
>
> // get offset ranges.
>
> OffsetRange[] offsetRanges = ((HasOffsetRanges)
> rdd.rdd()).offsetRanges();
>
> // process something.
>
>
> // commit offset.
> ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
>
> }
> );
>
>
>
> I have tested with a kafka consumer written with raw kafka-clients jar
> library without problem that it consumes read_committed messages correctly,
> and the count of consumed read_committed messages is equal to the count of
> messages sent by kafka producer.
>
>
> And sometimes, I got the following exception.
>
> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
> (chango-private-1.chango.private executor driver):
> java.lang.IllegalArgumentException: requirement failed: Failed to get
> records for compacted spark-executor-school-student-group school-student-7
> after polling for 120000
>
> at scala.Predef$.require(Predef.scala:281)
>
> at
> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)
>
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)
>
>
>
> I have experienced spark streaming job which works fine with kafka
> messages which are non-transactional, and I never encountered the
> exceptions like above.
> It seems that spark streaming for kafka transaction does not handle such
> as kafka consumer properties like isolation.level=read_committed and
> enable.auto.commit=false correctly.
>
> Any help appreciated.
>
> - Kidong.
>
>
> --
> *이기동 *
> *Kidong Lee*
>
> Email: [email protected]
> Chango: https://cloudcheflabs.github.io/chango-private-docs
> Web Site: http://www.cloudchef-labs.com/
> Mobile: +82 10 4981 7297
> <http://www.cloudchef-labs.com/>
>
--
*이기동 *
*Kidong Lee*
Email: [email protected]
Chango: https://cloudcheflabs.github.io/chango-private-docs
Web Site: http://www.cloudchef-labs.com/
Mobile: +82 10 4981 7297
<http://www.cloudchef-labs.com/>