[ https://issues.apache.org/jira/browse/SPARK-26718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16752353#comment-16752353 ]
Ryne Yang commented on SPARK-26718: ----------------------------------- [~kabhwan] cool, I will make a PR soon. thank you. > structured streaming fetched wrong current offset from kafka > ------------------------------------------------------------ > > Key: SPARK-26718 > URL: https://issues.apache.org/jira/browse/SPARK-26718 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.0 > Reporter: Ryne Yang > Priority: Major > > when running spark structured streaming using lib: `"org.apache.spark" %% > "spark-sql-kafka-0-10" % "2.4.0"`, we keep getting error regarding current > offset fetching: > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2): > java.lang.AssertionError: assertion failed: latest offs > et -9223372036854775808 does not equal -1 > at scala.Predef$.assert(Predef.scala:170) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.<init>(KafkaMicroBatchReader.scala:329) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314) > at > org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > for some reason, looks like fetchLatestOffset returned a Long.MIN_VALUE for > one of the partitions. I checked the structured streaming checkpoint, that > was correct, it's the currentAvailableOffset was set to Long.MIN_VALUE. > kafka broker version: 1.1.0. > lib we used: > {\{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % > "2.4.0" }} > how to reproduce: > basically we started a structured streamer and subscribed a topic of 4 > partitions. then produced some messages into topic, job crashed and logged > the stacktrace like above. > also the committed offsets seem fine as we see in the logs: > {code:java} > === Streaming Query === > Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = > 31878627-d473-4ee8-955d-d4d3f3f45eb9] > Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: > {"REVENUEEVENT":{"0":1}}} > Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: > {"REVENUEEVENT":{"0":-9223372036854775808}}} > {code} > so spark streaming recorded the correct value for partition: 0, but the > current available offsets returned from kafka is showing Long.MIN_VALUE. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org