SilkyAlex created SPARK-39191: --------------------------------- Summary: KafkaDataConsumer not support transional producer Key: SPARK-39191 URL: https://issues.apache.org/jira/browse/SPARK-39191 Project: Spark Issue Type: Bug Components: Java API Affects Versions: 2.4.5 Reporter: SilkyAlex
When using transactions, Kafka insert "[control batches|https://kafka.apache.org/documentation/#controlbatch]" in the logs to indicate if messages were part of a transaction. These batches are also assigned offsets, so when I only sent a single record but the offsets increasing by 2. but KafkaDataConsumer assume that kafka offset only increase one, so it cause follow Exception: ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.8ANTLR Runtime version 4.7 used for parser compilation does not match the current runtime version 4.8ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.8ANTLR Runtime version 4.7 used for parser compilation does not match the current runtime version 4.8Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 101.0 failed 4 times, most recent failure: Lost task 8.3 in stage 101.0 (TID 2987, executor 1): java.lang.IllegalArgumentException: requirement failed: Got wrong record for spark-executor-source_from_kafka topic-3 even after seeking to offset 1071229367 got offset 1071229368 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets at scala.Predef$.require(Predef.scala:224) at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146) at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36) at org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:218) at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261) at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$1.hasNext(WholeStageCodegenExec.scala:634) at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$2$$anon$1.next(InMemoryRelation.scala:116) at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$2$$anon$1.next(InMemoryRelation.scala:108) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1107) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1098) at org.apache.spark.storage.BlockManager.a(BlockManager.scala:1033) at org.apache.spark.storage.BlockManager.a(BlockManager.scala:1098) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:824) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian Jira (v8.20.7#820007) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org