Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20449#discussion_r170820691 --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala --- @@ -320,6 +321,63 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft f2.get() } + test("Interruptible iterator of shuffle reader") { + // In this test case, we create a Spark job of two stages. The second stage is cancelled during + // execution and a counter is used to make sure that the corresponding tasks are indeed + // cancelled. + import JobCancellationSuite._ + val numSlice = 2 + sc = new SparkContext(s"local[$numSlice]", "test") + + val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) } + .repartitionAndSortWithinPartitions(new HashPartitioner(2)) + .mapPartitions { iter => + taskStartedSemaphore.release() --- End diff -- This will be called twice as the root RDD has 2 partitions, so `f.cancel` might be called before both of these 2 partitions finished.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org