[ 
https://issues.apache.org/jira/browse/SPARK-26718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16751694#comment-16751694
 ] 

Jungtaek Lim commented on SPARK-26718:
--------------------------------------

[~linehrr]
Thanks for the analysis. I think allowing Long.MaxValue to end users sounds 
convenient and end users already use it, so ensuring 'off' to not being 
overflowed would be ideal. This is simply ensured via modifying the code as:

{code}
val prorate = limit * (size / total)
val prorateLong = (if (prorate < 1) Math.ceil(prorate) else 
Math.floor(prorate)).toLong
val off = if (prorateLong > Long.MaxValue - begin) Long.MaxValue else begin + 
propateLong
Math.min(end, off)
{code}

Please submit a patch if you would like to. If you wouldn't submit a patch 
please let me know that I can take this up. Thanks!

> structured streaming fetched wrong current offset from kafka
> ------------------------------------------------------------
>
>                 Key: SPARK-26718
>                 URL: https://issues.apache.org/jira/browse/SPARK-26718
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Ryne Yang
>            Priority: Major
>
> when running spark structured streaming using lib: `"org.apache.spark" %% 
> "spark-sql-kafka-0-10" % "2.4.0"`, we keep getting error regarding current 
> offset fetching:
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2): 
> java.lang.AssertionError: assertion failed: latest offs
> et -9223372036854775808 does not equal -1
> at scala.Predef$.assert(Predef.scala:170)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.<init>(KafkaMicroBatchReader.scala:329)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314)
> at 
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> for some reason, looks like fetchLatestOffset returned a Long.MIN_VALUE for 
> one of the partitions. I checked the structured streaming checkpoint, that 
> was correct, it's the currentAvailableOffset was set to Long.MIN_VALUE.
> kafka broker version: 1.1.0.
> lib we used:
> {\{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % 
> "2.4.0" }}
> how to reproduce:
> basically we started a structured streamer and subscribed a topic of 4 
> partitions. then produced some messages into topic, job crashed and logged 
> the stacktrace like above.
> also the committed offsets seem fine as we see in the logs: 
> {code:java}
> === Streaming Query ===
> Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = 
> 31878627-d473-4ee8-955d-d4d3f3f45eb9]
> Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
> {"REVENUEEVENT":{"0":1}}}
> Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
> {"REVENUEEVENT":{"0":-9223372036854775808}}}
> {code}
> so spark streaming recorded the correct value for partition: 0, but the 
> current available offsets returned from kafka is showing Long.MIN_VALUE. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to