Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20449#discussion_r170822587 --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala --- @@ -320,6 +321,63 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft f2.get() } + test("Interruptible iterator of shuffle reader") { + // In this test case, we create a Spark job of two stages. The second stage is cancelled during + // execution and a counter is used to make sure that the corresponding tasks are indeed + // cancelled. + import JobCancellationSuite._ + val numSlice = 2 + sc = new SparkContext(s"local[$numSlice]", "test") + + val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) } + .repartitionAndSortWithinPartitions(new HashPartitioner(2)) + .mapPartitions { iter => + taskStartedSemaphore.release() + iter + }.foreachAsync { x => + if (x._1 >= 10) { + // This block of code is partially executed. It will be blocked when x._1 >= 10 and the + // next iteration will be cancelled if the source iterator is interruptible. Then in this + // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first + // element in another partition(assuming no ordering guarantee). + taskCancelledSemaphore.acquire() + } + executionOfInterruptibleCounter.getAndIncrement() + } + + val sem = new Semaphore(0) + val taskCompletedSem = new Semaphore(0) + Future { + taskStartedSemaphore.acquire() + f.cancel() --- End diff -- Line 372: `sem.acquire()` is blocked by this `Future block`, but it looks we don't need `Future` or `sem` here. I will update the code.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org