[ https://issues.apache.org/jira/browse/SPARK-38824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-38824: ---------------------------------- Component/s: DStreams (was: Spark Core) > Bug in async commit of Kafka offset in DirectKafkaInputDStream > -------------------------------------------------------------- > > Key: SPARK-38824 > URL: https://issues.apache.org/jira/browse/SPARK-38824 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.4.8, 3.0.0, 3.0.3, 3.1.0, 3.1.2, 3.2.1 > Reporter: SOUVIK PAUL > Priority: Major > > I added a few debug statements at the following lines and found few issues. > 1. At line 254 of override def compute(validTime: Time): Option[KafkaRDD[K, > V]] in DirectKafkaInputDStream.scala: > System.out.print("Called commitAll at time " + validTime + " " + > commitQueue.toArray.mkString("Array(", ", ", ")") + "\n") > 2. At line 454 of test("offset recovery from kafka") in > DirectKafkaStreamSuite.scala: > print("Called commitAsync at " + time + " " + offsets.mkString("Array(", ", > ", ")") + "\n") > This shows that the commitAll call is not properly handled. Since, it is > called inside compute function. There is a chance that during last RDD, we > will miss the last offset. In the current example we have missed the offset > commit of range 8->10. > Can someone confirm if this is a design choice or a bug? > The current log is something like this. > Called commitAll at time 1645548063100 ms Array() > Called commitAll at time 1645548063200 ms Array() > Called commitAll at time 1645548063300 ms Array() > Called commitAll at time 1645548063400 ms Array() > Called commitAll at time 1645548063500 ms Array() > Called commitAll at time 1645548063600 ms Array() > Called commitAll at time 1645548063700 ms Array() > Called commitAll at time 1645548063800 ms Array() > Called commitAll at time 1645548063900 ms Array() > Called commitAll at time 1645548064000 ms Array() > Called commitAll at time 1645548064100 ms Array() > Called commitAll at time 1645548064200 ms Array() > Called commitAsync at 1645548063100 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [0 -> 4])) > Called commitAsync at 1645548063200 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAll at time 1645548064300 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [0 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548063300 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548063400 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548063500 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAll at time 1645548064400 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548063600 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548063700 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548063800 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548063900 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAll at time 1645548064500 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548064000 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548064100 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548064200 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548064300 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAll at time 1645548064600 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548064400 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 8])) > Called commitAsync at 1645548064500 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [8 -> 8])) > Called commitAsync at 1645548064600 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [8 -> 8])) > Called commitAll at time 1645548064700 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 8]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [8 -> 8]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [8 -> 8])) > Called commitAsync at 1645548064700 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [8 -> 10])) > Regards, > Souvik Paul > GitHub: @paulsouri -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org