This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5beaf85cd5ef [SPARK-47793][TEST][FOLLOWUP] Fix flaky test for Python data source exactly once 5beaf85cd5ef is described below commit 5beaf85cd5ef2b84a67ebce712e8d73d1e7d41ff Author: Chaoqin Li <chaoqin...@databricks.com> AuthorDate: Fri May 10 08:24:42 2024 -0700 [SPARK-47793][TEST][FOLLOWUP] Fix flaky test for Python data source exactly once ### What changes were proposed in this pull request? Fix the flakiness in python streaming source exactly once test. The last executed batch may not be recorded in query progress, which cause the expected rows doesn't match. This fix takes the uncompleted batch into account and relax the condition ### Why are the changes needed? Fix flaky test. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Test change. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46481 from chaoqin-li1123/fix_python_ds_test. Authored-by: Chaoqin Li <chaoqin...@databricks.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../execution/python/PythonStreamingDataSourceSuite.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala index 97e6467c3eaf..d1f7c597b308 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala @@ -299,7 +299,7 @@ class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase { val checkpointDir = new File(path, "checkpoint") val outputDir = new File(path, "output") val df = spark.readStream.format(dataSourceName).load() - var lastBatch = 0 + var lastBatchId = 0 // Restart streaming query multiple times to verify exactly once guarantee. for (i <- 1 to 5) { @@ -323,11 +323,15 @@ class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase { } q.stop() q.awaitTermination() - lastBatch = q.lastProgress.batchId.toInt + lastBatchId = q.lastProgress.batchId.toInt } - assert(lastBatch > 20) + assert(lastBatchId > 20) + val rowCount = spark.read.format("json").load(outputDir.getAbsolutePath).count() + // There may be one uncommitted batch that is not recorded in query progress. + // The number of batch can be lastBatchId + 1 or lastBatchId + 2. + assert(rowCount == 2 * (lastBatchId + 1) || rowCount == 2 * (lastBatchId + 2)) checkAnswer(spark.read.format("json").load(outputDir.getAbsolutePath), - (0 to 2 * lastBatch + 1).map(Row(_))) + (0 until rowCount.toInt).map(Row(_))) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org