Ryne Yang created KAFKA-7796:
--------------------------------

             Summary: structured streaming fetched wrong current offset from 
kafka
                 Key: KAFKA-7796
                 URL: https://issues.apache.org/jira/browse/KAFKA-7796
             Project: Kafka
          Issue Type: Bug
          Components: consumer
         Environment: Linux, Centos 7
            Reporter: Ryne Yang


when running spark structured streaming using lib: `"org.apache.spark" %% 
"spark-sql-kafka-0-10" % "2.4.0"`, we keep getting error regarding current 
offset fetching:
{code:java}
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 
0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2): java.lang.AssertionError: 
assertion failed: latest offs
et -9223372036854775808 does not equal -1
at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371)
at 
org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.<init>(KafkaMicroBatchReader.scala:329)
at 
org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314)
at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}
for some reason, looks like fetchLatestOffset returned a Long.MIN_VALUE for one 
of the partitions. I checked the structured streaming checkpoint, that was 
correct, it's the currentAvailableOffset was set to Long.MIN_VALUE.

kafka broker version: 1.1.0.
lib we used:

{{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" 
}}

how to reproduce:
basically we started a structured streamer and subscribed a topic of 4 
partitions. then produced some messages into topic, job crashed and logged the 
stacktrace like above.

also the committed offsets seem fine as we see in the logs: 
{code:java}
=== Streaming Query ===
Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = 
31878627-d473-4ee8-955d-d4d3f3f45eb9]
Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
{"REVENUEEVENT":{"0":1}}}
Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
{"REVENUEEVENT":{"0":-9223372036854775808}}}
{code}
so spark streaming recorded the correct value for partition: 0, but the current 
available offsets returned from kafka is showing Long.MIN_VALUE. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to