Repository: spark Updated Branches: refs/heads/branch-2.1 a7364a82e -> 08e427287
[SPARK-18868][FLAKY-TEST] Deflake StreamingQueryListenerSuite: single listener, check trigger... ## What changes were proposed in this pull request? Use `recentProgress` instead of `lastProgress` and filter out last non-zero value. Also add eventually to the latest assertQuery similar to first `assertQuery` ## How was this patch tested? Ran test 1000 times Author: Burak Yavuz <brk...@gmail.com> Closes #16287 from brkyvz/SPARK-18868. (cherry picked from commit 9c7f83b0289ba4550b156e6af31cf7c44580eb12) Signed-off-by: Shixiong Zhu <shixi...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08e42728 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08e42728 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08e42728 Branch: refs/heads/branch-2.1 Commit: 08e4272872fc17c43f0dc79d329b946e8e85694d Parents: a7364a8 Author: Burak Yavuz <brk...@gmail.com> Authored: Thu Dec 15 15:46:03 2016 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Thu Dec 15 15:46:10 2016 -0800 ---------------------------------------------------------------------- .../streaming/StreamingQueryListenerSuite.scala | 25 +++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/08e42728/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 7c6745ac..a057d1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -84,7 +84,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { CheckAnswer(10, 5), AssertOnQuery { query => assert(listener.progressEvents.nonEmpty) - assert(listener.progressEvents.last.json === query.lastProgress.json) + // SPARK-18868: We can't use query.lastProgress, because in progressEvents, we filter + // out non-zero input rows, but the lastProgress may be a zero input row trigger + val lastNonZeroProgress = query.recentProgress.filter(_.numInputRows > 0).lastOption + .getOrElse(fail("No progress updates received in StreamingQuery!")) + assert(listener.progressEvents.last.json === lastNonZeroProgress.json) assert(listener.terminationEvent === null) true }, @@ -109,14 +113,17 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { AdvanceManualClock(100), ExpectFailure[SparkException], AssertOnQuery { query => - assert(listener.terminationEvent !== null) - assert(listener.terminationEvent.id === query.id) - assert(listener.terminationEvent.exception.nonEmpty) - // Make sure that the exception message reported through listener - // contains the actual exception and relevant stack trace - assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException")) - assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException")) - assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite")) + eventually(Timeout(streamingTimeout)) { + assert(listener.terminationEvent !== null) + assert(listener.terminationEvent.id === query.id) + assert(listener.terminationEvent.exception.nonEmpty) + // Make sure that the exception message reported through listener + // contains the actual exception and relevant stack trace + assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException")) + assert( + listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException")) + assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite")) + } listener.checkAsyncErrors() true } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org