Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20449#discussion_r172414393 --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala --- @@ -104,9 +104,16 @@ private[spark] class BlockStoreShuffleReader[K, C]( context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) + // Use completion callback to stop sorter if task was finished/cancelled. + context.addTaskCompletionListener(_ => { + sorter.stop() + }) CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) case None => aggregatedIter } + // Use another interruptible iterator here to support task cancellation as aggregator or(and) + // sorter may have consumed previous interruptible iterator. + new InterruptibleIterator[Product2[K, C]](context, resultIter) --- End diff -- there is a chance that `resultIter` is already an `InterruptibleIterator`, and we should not double wrap it. Can you send a followup PR to fix this? then we can backport them to 2.3 together.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org