Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20675#discussion_r170665692
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
    @@ -219,18 +219,59 @@ class ContinuousSuite extends ContinuousSuiteBase {
         spark.sparkContext.addSparkListener(listener)
         try {
           testStream(df, useV2Sink = true)(
    -        StartStream(Trigger.Continuous(100)),
    +        StartStream(longContinuousTrigger),
    +        AwaitEpoch(0),
             Execute(waitForRateSourceTriggers(_, 2)),
    +        IncrementEpoch(),
             Execute { _ =>
               // Wait until a task is started, then kill its first attempt.
               eventually(timeout(streamingTimeout)) {
                 assert(taskId != -1)
               }
               spark.sparkContext.killTaskAttempt(taskId)
             },
    -        ExpectFailure[SparkException] { e =>
    -          e.getCause != null && 
e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]
    -        })
    +        Execute(waitForRateSourceTriggers(_, 4)),
    +        IncrementEpoch(),
    +        // Check the answer exactly, if there's duplicated result, 
CheckAnserRowsContains
    +        // will also return true.
    +        CheckAnswerRowsContainsOnlyOnce(scala.Range(0, 20).map(Row(_))),
    --- End diff --
    
    Checking exact answer can just be `CheckAnswer(0 to 20: _*)`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to