Repository: spark
Updated Branches:
  refs/heads/master 6c66ab8b3 -> c17a8ff52


[SPARK-25214][SS][FOLLOWUP] Fix the issue that Kafka v2 source may return 
duplicated records when `failOnDataLoss=false`

## What changes were proposed in this pull request?

This is a follow up PR for #22207 to fix a potential flaky test. 
`processAllAvailable` doesn't work for continuous processing so we should not 
use it for a continuous query.

## How was this patch tested?

Jenkins.

Closes #22230 from zsxwing/SPARK-25214-2.

Authored-by: Shixiong Zhu <zsxw...@gmail.com>
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c17a8ff5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c17a8ff5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c17a8ff5

Branch: refs/heads/master
Commit: c17a8ff52377871ab4ff96b648ebaf4112f0b5be
Parents: 6c66ab8
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Sat Aug 25 09:17:40 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Sat Aug 25 09:17:40 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala    | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c17a8ff5/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
index 0ff341c..39c4e3f 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
@@ -80,7 +80,7 @@ trait KafkaMissingOffsetsTest extends SharedSQLContext {
   }
 }
 
-class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+class KafkaDontFailOnDataLossSuite extends StreamTest with 
KafkaMissingOffsetsTest {
 
   import testImplicits._
 
@@ -165,7 +165,11 @@ class KafkaDontFailOnDataLossSuite extends 
KafkaMissingOffsetsTest {
         .trigger(Trigger.Continuous(100))
         .start()
       try {
-        query.processAllAvailable()
+        // `processAllAvailable` doesn't work for continuous processing, so 
just wait until the last
+        // record appears in the table.
+        eventually(timeout(streamingTimeout)) {
+          assert(spark.table(table).as[String].collect().contains("49"))
+        }
       } finally {
         query.stop()
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to