Hi,

I have a kafka producer which sends messages transactionally to kafka and
spark streaming job which should consume read_committed messages from kafka.
But there is a problem for spark streaming to consume read_committed
messages.
The count of messages sent by kafka producer transactionally is not the
same to the count of the read_committed messages consumed by spark
streaming.

Some consumer properties of my spark streaming job are as follows.

auto.offset.reset=earliest
enable.auto.commit=false
isolation.level=read_committed


I also added the following spark streaming configuration.

sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
sparkConf.set("spark.streaming.kafka.consumer.poll.ms",
String.valueOf(2 * 60 * 1000));


My spark streaming is using DirectStream like this.

JavaInputDStream<ConsumerRecord<Integer, GenericRecord>> stream =
        KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<Integer,
GenericRecord>Subscribe(topics, kafkaParams)
        );


stream.foreachRDD(rdd -> O

       // get offset ranges.

       OffsetRange[] offsetRanges = ((HasOffsetRanges)
rdd.rdd()).offsetRanges();

       // process something.


       // commit offset.
       ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);

}
);



I have tested with a kafka consumer written with raw kafka-clients jar
library without problem that it consumes read_committed messages correctly,
and the count of consumed read_committed messages is equal to the count of
messages sent by kafka producer.


And sometimes, I got the following exception.

Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
(chango-private-1.chango.private executor driver):
java.lang.IllegalArgumentException: requirement failed: Failed to get
records for compacted spark-executor-school-student-group school-student-7
after polling for 120000

at scala.Predef$.require(Predef.scala:281)

at
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)

at
org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)



I have experienced spark streaming job which works fine with kafka messages
which are non-transactional, and I never encountered the exceptions like
above.
It seems that spark streaming for kafka transaction does not handle such as
kafka consumer properties like isolation.level=read_committed and
enable.auto.commit=false correctly.

Any help appreciated.

- Kidong.


-- 
*이기동 *
*Kidong Lee*

Email: mykid...@gmail.com
Chango: https://cloudcheflabs.github.io/chango-private-docs
Web Site: http://www.cloudchef-labs.com/
Mobile: +82 10 4981 7297
<http://www.cloudchef-labs.com/>

Reply via email to