SilkyAlex created SPARK-39191:
---------------------------------

             Summary: KafkaDataConsumer not support transional producer
                 Key: SPARK-39191
                 URL: https://issues.apache.org/jira/browse/SPARK-39191
             Project: Spark
          Issue Type: Bug
          Components: Java API
    Affects Versions: 2.4.5
            Reporter: SilkyAlex


When using transactions, Kafka insert "[control 
batches|https://kafka.apache.org/documentation/#controlbatch]"; in the logs to 
indicate if messages were part of a transaction.

These batches are also assigned offsets, so when I only sent a single record 
but the offsets increasing by 2.

but KafkaDataConsumer assume that kafka offset only increase one,

so it cause follow Exception:

ANTLR Tool version 4.7 used for code generation does not match the current 
runtime version 4.8ANTLR Runtime version 4.7 used for parser compilation does 
not match the current runtime version 4.8ANTLR Tool version 4.7 used for code 
generation does not match the current runtime version 4.8ANTLR Runtime version 
4.7 used for parser compilation does not match the current runtime version 
4.8Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
to stage failure: Task 8 in stage 101.0 failed 4 times, most recent failure: 
Lost task 8.3 in stage 101.0 (TID 2987, executor 1): 
java.lang.IllegalArgumentException: requirement failed: Got wrong record for 
spark-executor-source_from_kafka topic-3 even after seeking to offset 
1071229367 got offset 1071229368 instead. If this is a compacted topic, 
consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
    at scala.Predef$.require(Predef.scala:224)
    at 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146)
    at 
org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
    at 
org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:218)
    at 
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
    at 
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$1.hasNext(WholeStageCodegenExec.scala:634)
    at 
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$2$$anon$1.next(InMemoryRelation.scala:116)
    at 
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$2$$anon$1.next(InMemoryRelation.scala:108)
    at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
    at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1107)
    at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1098)
    at org.apache.spark.storage.BlockManager.a(BlockManager.scala:1033)
    at org.apache.spark.storage.BlockManager.a(BlockManager.scala:1098)
    at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:824)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to