Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20675#discussion_r170665692 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -219,18 +219,59 @@ class ContinuousSuite extends ContinuousSuiteBase { spark.sparkContext.addSparkListener(listener) try { testStream(df, useV2Sink = true)( - StartStream(Trigger.Continuous(100)), + StartStream(longContinuousTrigger), + AwaitEpoch(0), Execute(waitForRateSourceTriggers(_, 2)), + IncrementEpoch(), Execute { _ => // Wait until a task is started, then kill its first attempt. eventually(timeout(streamingTimeout)) { assert(taskId != -1) } spark.sparkContext.killTaskAttempt(taskId) }, - ExpectFailure[SparkException] { e => - e.getCause != null && e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException] - }) + Execute(waitForRateSourceTriggers(_, 4)), + IncrementEpoch(), + // Check the answer exactly, if there's duplicated result, CheckAnserRowsContains + // will also return true. + CheckAnswerRowsContainsOnlyOnce(scala.Range(0, 20).map(Row(_))), --- End diff -- Checking exact answer can just be `CheckAnswer(0 to 20: _*)`.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org