[ 
https://issues.apache.org/jira/browse/SPARK-38824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-38824:
----------------------------------
    Component/s: DStreams
                     (was: Spark Core)

> Bug in async commit of Kafka offset in DirectKafkaInputDStream
> --------------------------------------------------------------
>
>                 Key: SPARK-38824
>                 URL: https://issues.apache.org/jira/browse/SPARK-38824
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.4.8, 3.0.0, 3.0.3, 3.1.0, 3.1.2, 3.2.1
>            Reporter: SOUVIK PAUL
>            Priority: Major
>
> I added a few debug statements at the following lines and found few issues.
> 1. At line 254 of override def compute(validTime: Time): Option[KafkaRDD[K, 
> V]] in DirectKafkaInputDStream.scala:
> System.out.print("Called commitAll at time " + validTime + " " +
> commitQueue.toArray.mkString("Array(", ", ", ")") + "\n")
> 2. At line 454 of test("offset recovery from kafka") in 
> DirectKafkaStreamSuite.scala:
> print("Called commitAsync at " + time + " " + offsets.mkString("Array(", ", 
> ", ")") + "\n")
> This shows that the commitAll call is not properly handled. Since, it is 
> called inside compute function. There is a chance that during last RDD, we 
> will miss the last offset. In the current example we have missed the offset 
> commit of range 8->10.
> Can someone confirm if this is a design choice or a bug?
> The current log is something like this.
> Called commitAll at time 1645548063100 ms Array()
> Called commitAll at time 1645548063200 ms Array()
> Called commitAll at time 1645548063300 ms Array()
> Called commitAll at time 1645548063400 ms Array()
> Called commitAll at time 1645548063500 ms Array()
> Called commitAll at time 1645548063600 ms Array()
> Called commitAll at time 1645548063700 ms Array()
> Called commitAll at time 1645548063800 ms Array()
> Called commitAll at time 1645548063900 ms Array()
> Called commitAll at time 1645548064000 ms Array()
> Called commitAll at time 1645548064100 ms Array()
> Called commitAll at time 1645548064200 ms Array()
> Called commitAsync at 1645548063100 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [0 -> 4]))
> Called commitAsync at 1645548063200 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064300 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [0 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063300 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063400 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063500 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064400 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063600 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063700 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063800 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063900 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064500 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064000 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064100 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064200 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064300 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064600 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064400 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 8]))
> Called commitAsync at 1645548064500 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [8 -> 8]))
> Called commitAsync at 1645548064600 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [8 -> 8]))
> Called commitAll at time 1645548064700 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [4 -> 8]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [8 -> 8]), OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [8 -> 8]))
> Called commitAsync at 1645548064700 ms Array(OffsetRange(topic: 
> 'recoveryfromkafka', partition: 0, range: [8 -> 10]))
> Regards,
> Souvik Paul
> GitHub: @paulsouri



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to