Xianjin YE created SPARK-23040:
----------------------------------

             Summary: BlockStoreShuffleReader's return Iterator isn't 
interruptible if aggregator or ordering is specified
                 Key: SPARK-23040
                 URL: https://issues.apache.org/jira/browse/SPARK-23040
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
    Affects Versions: 2.2.1, 2.2.0, 2.1.2, 2.1.1, 2.1.0, 2.0.2, 2.0.1, 2.0.0
            Reporter: Xianjin YE
            Priority: Minor


For example, if ordering is specified, the returned iterator is an 
CompletionIterator
{code:java}
    dep.keyOrdering match {
      case Some(keyOrd: Ordering[K]) =>
        // Create an ExternalSorter to sort the data.
        val sorter =
          new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), 
serializer = dep.serializer)
        sorter.insertAll(aggregatedIter)
        context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
        context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
        context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
        CompletionIterator[Product2[K, C], Iterator[Product2[K, 
C]]](sorter.iterator, sorter.stop())
      case None =>
        aggregatedIter
    }
{code}

However the sorter would consume(in sorter.insertAll) the aggregatedIter(which 
may be interruptible), then creates an iterator which isn't interruptible.

The problem with this is that Spark task cannot be cancelled due to stage 
fail(without interruptThread enabled, which is disabled by default), which 
wasting executor resource.





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to