Because spark streaming for kafk transaction does not work correctly to
suit my need, I moved to another approach using raw kafka consumer which
handles read_committed messages from kafka correctly.

My codes look like the following.

JavaDStream<String> stream = ssc.receiverStream(new CustomReceiver());
// CustomReceiver does nothing special except awaking foreach task.

stream.foreachRDD(rdd -> {

  KafkaConsumer<Integer, GenericRecord> consumer = new



    ConsumerRecords<Integer, GenericRecord> records =

    Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();

    List<String> someList = new ArrayList<>();

    for (ConsumerRecord<Integer, GenericRecord> consumerRecord : records) {

      // add something to list.

      // put offset to offsetMap.


    // process someList.

        // commit offset.

    consumer.commitAsync(offsetMap, null);



In addition, I increased max.poll.records to 100000.

Even if this raw kafka consumer approach is not so scalable, it consumes
read_committed messages from kafka correctly and is enough for me at the

- Kidong.

> Hi,
> I have a kafka producer which sends messages transactionally to kafka and
> spark streaming job which should consume read_committed messages from kafka.
> But there is a problem for spark streaming to consume read_committed
> messages.
> The count of messages sent by kafka producer transactionally is not the
> same to the count of the read_committed messages consumed by spark
> streaming.
> Some consumer properties of my spark streaming job are as follows.
> auto.offset.reset=earliest
> isolation.level=read_committed
> I also added the following spark streaming configuration.
> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
> sparkConf.set("", String.valueOf(2 * 60 
> * 1000));
> My spark streaming is using DirectStream like this.
> JavaInputDStream<ConsumerRecord<Integer, GenericRecord>> stream =
>         KafkaUtils.createDirectStream(
>                 ssc,
>                 LocationStrategies.PreferConsistent(),
>                 ConsumerStrategies.<Integer, GenericRecord>Subscribe(topics, 
> kafkaParams)
>         );
> stream.foreachRDD(rdd -> O
>        // get offset ranges.
>        OffsetRange[] offsetRanges = ((HasOffsetRanges) 
> rdd.rdd()).offsetRanges();
>        // process something.
>        // commit offset.
>        ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
> }
> );
> I have tested with a kafka consumer written with raw kafka-clients jar
> library without problem that it consumes read_committed messages correctly,
> and the count of consumed read_committed messages is equal to the count of
> messages sent by kafka producer.
> And sometimes, I got the following exception.
> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
> (chango-private-1.chango.private executor driver):
> java.lang.IllegalArgumentException: requirement failed: Failed to get
> records for compacted spark-executor-school-student-group school-student-7
> after polling for 120000
> at scala.Predef$.require(Predef.scala:281)
> at
> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)
> at
> org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)
> I have experienced spark streaming job which works fine with kafka
> messages which are non-transactional, and I never encountered the
> exceptions like above.
> It seems that spark streaming for kafka transaction does not handle such
> as kafka consumer properties like isolation.level=read_committed and
> correctly.
> Any help appreciated.
> - Kidong.
