Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20225#discussion_r161097981 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -219,6 +201,42 @@ class ContinuousSuite extends ContinuousSuiteBase { StopStream) } + test("task failure kills the query") { + val df = spark.readStream + .format("rate") + .option("numPartitions", "5") + .option("rowsPerSecond", "5") + .load() + .select('value) + + // Get an arbitrary task from this query to kill. It doesn't matter which one. + var taskId: Long = -1 + val listener = new SparkListener() { + override def onTaskStart(start: SparkListenerTaskStart): Unit = { + taskId = start.taskInfo.taskId + } + } + spark.sparkContext.addSparkListener(listener) + + testStream(df, useV2Sink = true)( + StartStream(Trigger.Continuous(100)), + Execute(waitForRateSourceTriggers(_, 2)), + Execute { query => + // Wait until a task is started, then kill its first attempt. + eventually(timeout(streamingTimeout)) { assert(taskId != -1) } + spark.sparkContext.killTaskAttempt(taskId) + eventually(timeout(streamingTimeout)) { + assert(query.exception.isDefined) + } + assert( + query.exception.get.getCause != null && + query.exception.get.getCause.getCause != null && + query.exception.get.getCause.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]) + }) + + spark.sparkContext.removeSparkListener(listener) --- End diff -- put this in a finally clause.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org