Repository: spark Updated Branches: refs/heads/master 6c66ab8b3 -> c17a8ff52
[SPARK-25214][SS][FOLLOWUP] Fix the issue that Kafka v2 source may return duplicated records when `failOnDataLoss=false` ## What changes were proposed in this pull request? This is a follow up PR for #22207 to fix a potential flaky test. `processAllAvailable` doesn't work for continuous processing so we should not use it for a continuous query. ## How was this patch tested? Jenkins. Closes #22230 from zsxwing/SPARK-25214-2. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c17a8ff5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c17a8ff5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c17a8ff5 Branch: refs/heads/master Commit: c17a8ff52377871ab4ff96b648ebaf4112f0b5be Parents: 6c66ab8 Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Sat Aug 25 09:17:40 2018 -0700 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Sat Aug 25 09:17:40 2018 -0700 ---------------------------------------------------------------------- .../spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c17a8ff5/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala index 0ff341c..39c4e3f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala @@ -80,7 +80,7 @@ trait KafkaMissingOffsetsTest extends SharedSQLContext { } } -class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest { +class KafkaDontFailOnDataLossSuite extends StreamTest with KafkaMissingOffsetsTest { import testImplicits._ @@ -165,7 +165,11 @@ class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest { .trigger(Trigger.Continuous(100)) .start() try { - query.processAllAvailable() + // `processAllAvailable` doesn't work for continuous processing, so just wait until the last + // record appears in the table. + eventually(timeout(streamingTimeout)) { + assert(spark.table(table).as[String].collect().contains("49")) + } } finally { query.stop() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org