Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20225#discussion_r161056545
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
    @@ -219,6 +199,37 @@ class ContinuousSuite extends ContinuousSuiteBase {
           StopStream)
       }
     
    +  test("kill task") {
    +    val df = spark.readStream
    +      .format("rate")
    +      .option("numPartitions", "5")
    +      .option("rowsPerSecond", "5")
    +      .load()
    +      .select('value)
    +
    +    // Get an arbitrary task from this query to kill. It doesn't matter 
which one.
    +    var taskId: Long = -1
    +    val listener = new SparkListener() {
    +      override def onTaskStart(start: SparkListenerTaskStart): Unit = {
    +        taskId = start.taskInfo.taskId
    +      }
    +    }
    +    spark.sparkContext.addSparkListener(listener)
    +
    +    testStream(df, useV2Sink = true)(
    +      StartStream(Trigger.Continuous(100)),
    +      Execute(waitForRateSourceTriggers(_, 2)),
    +      Execute { _ =>
    +        eventually(timeout(streamingTimeout)) { assert(taskId != -1) }
    +        spark.sparkContext.killTaskAttempt(taskId)
    +      },
    +      Execute(waitForRateSourceTriggers(_, 4)),
    --- End diff --
    
    It kills an arbitrary task, and checks that query execution continues 
onward unaffected.
    
    I've added a check that the run ID has changed, confirming that the retry 
was indeed made at the ContinuousExecution level.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to