This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 526490f [SPARK-28247][SS][TEST] Fix flaky test "query without test harness" on ContinuousSuite 526490f is described below commit 526490fc2562b93046214e3f90eecee08722cd33 Author: Shixiong Zhu <zsxw...@gmail.com> AuthorDate: Tue Apr 27 08:07:09 2021 +0900 [SPARK-28247][SS][TEST] Fix flaky test "query without test harness" on ContinuousSuite ### What changes were proposed in this pull request? This is another attempt to fix the flaky test "query without test harness" on ContinuousSuite. `query without test harness` is flaky because it starts a continuous query with two partitions but assumes they will run at the same speed. In this test, 0 and 2 will be written to partition 0, 1 and 3 will be written to partition 1. It assumes when we see 3, 2 should be written to the memory sink. But this is not guaranteed. We can add `if (currentValue == 2) Thread.sleep(5000)` at this line https://github.com/apache/spark/blob/b2a2b5d8206b7c09b180b8b6363f73c6c3fdb1d8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala#L135 to reproduce the failure: `Result set Set [...] The fix is changing `waitForRateSourceCommittedValue` to wait until all partitions reach the desired values before stopping the query. ### Why are the changes needed? Fix a flaky test. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Manually verify the reproduction I mentioned above doesn't fail after this change. Closes #32316 from zsxwing/SPARK-28247-fix. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit 0df3b501aeb2a88997e5a68a6a8f8e7f5196342c) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/streaming/continuous/ContinuousSuite.scala | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 02f9139..0e2fcfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -54,27 +54,31 @@ class ContinuousSuiteBase extends StreamTest { protected def waitForRateSourceCommittedValue( query: ContinuousExecution, - desiredValue: Long, + partitionIdToDesiredValue: Map[Int, Long], maxWaitTimeMs: Long): Unit = { - def readHighestCommittedValue(c: ContinuousExecution): Option[Long] = { + def readCommittedValues(c: ContinuousExecution): Option[Map[Int, Long]] = { c.committedOffsets.lastOption.map { case (_, offset) => offset match { case o: RateStreamOffset => - o.partitionToValueAndRunTimeMs.map { - case (_, ValueRunTimeMsPair(value, _)) => value - }.max + o.partitionToValueAndRunTimeMs.mapValues(_.value).toMap } } } + def reachDesiredValues: Boolean = { + val committedValues = readCommittedValues(query).getOrElse(Map.empty) + partitionIdToDesiredValue.forall { case (key, value) => + committedValues.contains(key) && committedValues(key) > value + } + } + val maxWait = System.currentTimeMillis() + maxWaitTimeMs - while (System.currentTimeMillis() < maxWait && - readHighestCommittedValue(query).getOrElse(Long.MinValue) < desiredValue) { + while (System.currentTimeMillis() < maxWait && !reachDesiredValues) { Thread.sleep(100) } if (System.currentTimeMillis() > maxWait) { logWarning(s"Couldn't reach desired value in $maxWaitTimeMs milliseconds!" + - s"Current highest committed value is ${readHighestCommittedValue(query)}") + s"Current committed values is ${readCommittedValues(query)}") } } @@ -264,7 +268,7 @@ class ContinuousSuite extends ContinuousSuiteBase { val expected = Set(0, 1, 2, 3) val continuousExecution = query.asInstanceOf[StreamingQueryWrapper].streamingQuery.asInstanceOf[ContinuousExecution] - waitForRateSourceCommittedValue(continuousExecution, expected.max, 20 * 1000) + waitForRateSourceCommittedValue(continuousExecution, Map(0 -> 2, 1 -> 3), 20 * 1000) query.stop() val results = spark.read.table("noharness").collect() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org