Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/20150 Hi Shixiong, thanks a lot for your reply. The full stack below can reproduce by running the added UT based on original code base. ``` Assert on query failed: : Query [id = 3421db21-652e-47af-9d54-2b74a222abed, runId = cd8d7c94-1286-44a5-b000-a8d870aef6fa] terminated with exception: Partition topic-0-0's offset was changed from 10 to 5, some data may have been missed. Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. If you don't want your streaming query to fail on such cases, set the source option "failOnDataLoss" to "false". org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) Caused by: Partition topic-0-0's offset was changed from 10 to 5, some data may have been missed. Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. If you don't want your streaming query to fail on such cases, set the source option "failOnDataLoss" to "false". org.apache.spark.sql.kafka010.KafkaSource.org$apache$spark$sql$kafka010$KafkaSource$$reportDataLoss(KafkaSource.scala:332) org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:291) org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:289) scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247) scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) scala.collection.AbstractTraversable.filter(Traversable.scala:104) org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:289) ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org