[ 
https://issues.apache.org/jira/browse/SPARK-38824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522995#comment-17522995
 ] 

L. C. Hsieh commented on SPARK-38824:
-------------------------------------

Hmm, I think I can get the idea you want to point out. The offsets to commit 
are put into the queue by commitAsync, and then are picked up by commitAll in 
next compute call at DirectKafkaInputDStream. So if the stream job is stopped, 
the last offsets might be processed but not committed because no more compute 
call after.



> Bug in async commit of Kafka offset in DirectKafkaInputDStream
> --------------------------------------------------------------
>
>                 Key: SPARK-38824
>                 URL: https://issues.apache.org/jira/browse/SPARK-38824
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.4.8, 3.0.0, 3.0.3, 3.1.0, 3.1.2, 3.2.1
>            Reporter: SOUVIK PAUL
>            Priority: Major
>
> I added a few debug statements at the following lines and found few issues.
> 1. At line 254 of override def compute(validTime: Time): Option[KafkaRDD[K, 
> V]] in DirectKafkaInputDStream.scala:
> System.out.print("Called commitAll at time " + validTime + " " +
> commitQueue.toArray.mkString("Array(", ", ", ")") + "\n")
> 2. At line 454 of test("offset recovery from kafka") in 
> DirectKafkaStreamSuite.scala:
> print("Called commitAsync at " + time + " " + offsets.mkString("Array(", ", 
> ", ")") + "\n")
> This shows that the commitAll call is not properly handled. Since, it is 
> called inside compute function. There is a chance that during last RDD, we 
> will miss the last offset. In the current example we have missed the offset 
> commit of range 8->10.
> Can someone confirm if this is a design choice or a bug?
> The current log is something like this.
> Called commitAll at time 1645548063100 ms Array()
> Called commitAll at time 1645548063200 ms Array()
> Called commitAll at time 1645548063300 ms Array()
> Called commitAll at time 1645548063400 ms Array()
> Called commitAll at time 1645548063500 ms Array()
> Called commitAll at time 1645548063600 ms Array()
> Called commitAll at time 1645548063700 ms Array()
> Called commitAll at time 1645548063800 ms Array()
> Called commitAll at time 1645548063900 ms Array()
> Called commitAll at time 1645548064000 ms Array()
> Called commitAll at time 1645548064100 ms Array()
> Called commitAll at time 1645548064200 ms Array()
> Called commitAsync at 1645548063100 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [0 -> 4]))
> Called commitAsync at 1645548063200 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064300 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [0 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063300 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063400 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063500 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064400 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063600 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063700 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063800 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063900 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064500 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064000 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064100 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064200 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064300 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064600 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064400 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 8]))
> Called commitAsync at 1645548064500 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [8 -> 8]))
> Called commitAsync at 1645548064600 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [8 -> 8]))
> Called commitAll at time 1645548064700 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 8]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [8 -> 8]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [8 -> 8]))
> Called commitAsync at 1645548064700 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [8 -> 10]))
> Regards,
> Souvik Paul
> GitHub: @paulsouri



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to