Hi, I have a kafka producer which sends messages transactionally to kafka and spark streaming job which should consume read_committed messages from kafka. But there is a problem for spark streaming to consume read_committed messages. The count of messages sent by kafka producer transactionally is not the same to the count of the read_committed messages consumed by spark streaming.
Some consumer properties of my spark streaming job are as follows. auto.offset.reset=earliest enable.auto.commit=false isolation.level=read_committed I also added the following spark streaming configuration. sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true"); sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 60 * 1000)); My spark streaming is using DirectStream like this. JavaInputDStream<ConsumerRecord<Integer, GenericRecord>> stream = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<Integer, GenericRecord>Subscribe(topics, kafkaParams) ); stream.foreachRDD(rdd -> O // get offset ranges. OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); // process something. // commit offset. ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); } ); I have tested with a kafka consumer written with raw kafka-clients jar library without problem that it consumes read_committed messages correctly, and the count of consumed read_committed messages is equal to the count of messages sent by kafka producer. And sometimes, I got the following exception. Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times, most recent failure: Lost task 0.0 in stage 324.0 (TID 1674) (chango-private-1.chango.private executor driver): java.lang.IllegalArgumentException: requirement failed: Failed to get records for compacted spark-executor-school-student-group school-student-7 after polling for 120000 at scala.Predef$.require(Predef.scala:281) at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186) at org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60) at org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59) at org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219) I have experienced spark streaming job which works fine with kafka messages which are non-transactional, and I never encountered the exceptions like above. It seems that spark streaming for kafka transaction does not handle such as kafka consumer properties like isolation.level=read_committed and enable.auto.commit=false correctly. Any help appreciated. - Kidong. -- *이기동 * *Kidong Lee* Email: mykid...@gmail.com Chango: https://cloudcheflabs.github.io/chango-private-docs Web Site: http://www.cloudchef-labs.com/ Mobile: +82 10 4981 7297 <http://www.cloudchef-labs.com/>