Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20225#discussion_r161056545 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -219,6 +199,37 @@ class ContinuousSuite extends ContinuousSuiteBase { StopStream) } + test("kill task") { + val df = spark.readStream + .format("rate") + .option("numPartitions", "5") + .option("rowsPerSecond", "5") + .load() + .select('value) + + // Get an arbitrary task from this query to kill. It doesn't matter which one. + var taskId: Long = -1 + val listener = new SparkListener() { + override def onTaskStart(start: SparkListenerTaskStart): Unit = { + taskId = start.taskInfo.taskId + } + } + spark.sparkContext.addSparkListener(listener) + + testStream(df, useV2Sink = true)( + StartStream(Trigger.Continuous(100)), + Execute(waitForRateSourceTriggers(_, 2)), + Execute { _ => + eventually(timeout(streamingTimeout)) { assert(taskId != -1) } + spark.sparkContext.killTaskAttempt(taskId) + }, + Execute(waitForRateSourceTriggers(_, 4)), --- End diff -- It kills an arbitrary task, and checks that query execution continues onward unaffected. I've added a check that the run ID has changed, confirming that the retry was indeed made at the ContinuousExecution level.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org