This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5beaf85cd5ef [SPARK-47793][TEST][FOLLOWUP] Fix flaky test for Python 
data source exactly once
5beaf85cd5ef is described below

commit 5beaf85cd5ef2b84a67ebce712e8d73d1e7d41ff
Author: Chaoqin Li <chaoqin...@databricks.com>
AuthorDate: Fri May 10 08:24:42 2024 -0700

    [SPARK-47793][TEST][FOLLOWUP] Fix flaky test for Python data source exactly 
once
    
    ### What changes were proposed in this pull request?
    Fix the flakiness in python streaming source exactly once test. The last 
executed batch may not be recorded in query progress, which cause the expected 
rows doesn't match. This fix takes the uncompleted batch into account and relax 
the condition
    
    ### Why are the changes needed?
    Fix flaky test.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Test change.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #46481 from chaoqin-li1123/fix_python_ds_test.
    
    Authored-by: Chaoqin Li <chaoqin...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../execution/python/PythonStreamingDataSourceSuite.scala    | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala
index 97e6467c3eaf..d1f7c597b308 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala
@@ -299,7 +299,7 @@ class PythonStreamingDataSourceSuite extends 
PythonDataSourceSuiteBase {
       val checkpointDir = new File(path, "checkpoint")
       val outputDir = new File(path, "output")
       val df = spark.readStream.format(dataSourceName).load()
-      var lastBatch = 0
+      var lastBatchId = 0
       // Restart streaming query multiple times to verify exactly once 
guarantee.
       for (i <- 1 to 5) {
 
@@ -323,11 +323,15 @@ class PythonStreamingDataSourceSuite extends 
PythonDataSourceSuiteBase {
         }
         q.stop()
         q.awaitTermination()
-        lastBatch = q.lastProgress.batchId.toInt
+        lastBatchId = q.lastProgress.batchId.toInt
       }
-      assert(lastBatch > 20)
+      assert(lastBatchId > 20)
+      val rowCount = 
spark.read.format("json").load(outputDir.getAbsolutePath).count()
+      // There may be one uncommitted batch that is not recorded in query 
progress.
+      // The number of batch can be lastBatchId + 1 or lastBatchId + 2.
+      assert(rowCount == 2 * (lastBatchId + 1) || rowCount == 2 * (lastBatchId 
+ 2))
       checkAnswer(spark.read.format("json").load(outputDir.getAbsolutePath),
-        (0 to  2 * lastBatch + 1).map(Row(_)))
+        (0 until rowCount.toInt).map(Row(_)))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to