Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/20150
  
    Hi Shixiong, thanks a lot for your reply.
    The full stack below can reproduce by running the added UT based on 
original code base.
    ```
    Assert on query failed: : Query [id = 3421db21-652e-47af-9d54-2b74a222abed, 
runId = cd8d7c94-1286-44a5-b000-a8d870aef6fa] terminated with exception: 
Partition topic-0-0's offset was changed from 10 to 5, some data may have been 
missed. 
    Some data may have been lost because they are not available in Kafka any 
more; either the
     data was aged out by Kafka or the topic may have been deleted before all 
the data in the
     topic was processed. If you don't want your streaming query to fail on 
such cases, set the
     source option "failOnDataLoss" to "false".
        
    
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
        
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
    
        Caused by:      Partition topic-0-0's offset was changed from 10 to 5, 
some data may have been missed. 
    Some data may have been lost because they are not available in Kafka any 
more; either the
     data was aged out by Kafka or the topic may have been deleted before all 
the data in the
     topic was processed. If you don't want your streaming query to fail on 
such cases, set the
     source option "failOnDataLoss" to "false".
        
        
org.apache.spark.sql.kafka010.KafkaSource.org$apache$spark$sql$kafka010$KafkaSource$$reportDataLoss(KafkaSource.scala:332)
                
org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:291)
                
org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:289)
                
scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
                
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
                
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
                
scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
                
scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
                
scala.collection.AbstractTraversable.filter(Traversable.scala:104)
                
org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:289)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to