Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/6423#discussion_r32189473 --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala --- @@ -33,23 +34,55 @@ private[spark] class HashShuffleReader[K, C]( "Hash shuffle currently only supports fetching one partition") private val dep = handle.dependency + private val blockManager = SparkEnv.get.blockManager /** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { + val blockStreams = BlockStoreShuffleFetcher.fetchBlockStreams( + handle.shuffleId, startPartition, context) + + // Wrap the streams for compression based on configuration + val wrappedStreams = blockStreams.map { case (blockId, inputStream) => + blockManager.wrapForCompression(blockId, inputStream) + } + val ser = Serializer.getSerializer(dep.serializer) - val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser) + val serializerInstance = ser.newInstance() + + // Create a key/value iterator for each stream + val recordIter = wrappedStreams.flatMap { wrappedStream => + serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator + } + + // Update the context task metrics for each record read. + val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency() + val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( + recordIter.map(record => { + readMetrics.incRecordsRead(1) + record + }), + context.taskMetrics().updateShuffleReadMetrics()) + + // An interruptible iterator must be used here in order to support task cancellation + val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter) val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { if (dep.mapSideCombine) { - new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context)) + // We are reading values that are already combined + val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]] + dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context) } else { - new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context)) + // We don't know the value type, but also don't care -- the dependency *should* + // have made sure its compatible w/ this aggregator, which will convert the value + // type to the combined type C + val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]] + dep.aggregator.get.combineValuesByKey(keyValuesIterator, context) } } else { require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") // Convert the Product2s to pairs since this is what downstream RDDs currently expect - iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2)) + interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2)) --- End diff -- so, this is now really off-topic from the purpose of this PR, but as long as we're cleaning this bit of code up ... another thing which confuses me about the types in the old code is this business about `Product2[K, C]` vs `(K, C)`. (a) in this branch, I'm pretty sure already have a `Iterator[(K, C)]`, so we don't need the `map` at all and can just directly cast `interruptibleIterator.asInstanceOf[Iterator[(K, C)]]` (b) also then the type of `aggregatedIter` could be `Iterator[(K, C)]` (though I suppose its fine as is too) (c) what exactly is this comment referring to? ``` // Convert the Product2s to pairs since this is what downstream RDDs currently expect ``` is that still true? if it is, then don't we really have the wrong return type on `ExternalSorter#iterator` & `ShuffleReader#read`? It seems that we actually really do still require it to be a `(K, C)`, not the more general `Product2[K, C]`, given the [way its used in `ShuffledRDD`](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L91). So I guess the comment is right, and we actually do require more stringent types than what's in the interface? anyhow, if the comment is right, then changing the types elsewhere for (c) is probably out-of-scope for this PR, but we could at least do (a) and maybe (b) as well.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org