mridulm commented on code in PR #44512:
URL: https://github.com/apache/spark/pull/44512#discussion_r1460207780


##########
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala:
##########
@@ -111,31 +111,50 @@ private[spark] class BlockStoreShuffleReader[K, C](
     // An interruptible iterator must be used here in order to support task 
cancellation
     val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, 
metricIter)
 
-    val aggregatedIter: Iterator[Product2[K, C]] = if 
(dep.aggregator.isDefined) {
+    // Sort the output if there is a sort ordering defined.
+    var aggregated = false
+    // The type of the value cannot be determined here, maybe the type of value
+    // or the type of combined value.
+    val sortedIter: Iterator[Product2[K, Nothing]] = dep.keyOrdering match {
+      case Some(keyOrd: Ordering[K]) =>
+        // Create an ExternalSorter to sort the data.
+        val sorter: ExternalSorter[K, _, C] = if (dep.aggregator.isDefined) {
+          aggregated = true
+          if (dep.mapSideCombine) {
+            new ExternalSorter[K, C, C](context,
+              Option(new Aggregator[K, C, C](identity,
+                dep.aggregator.get.mergeCombiners,
+                dep.aggregator.get.mergeCombiners)),
+              ordering = Some(keyOrd), serializer = dep.serializer)
+          } else {
+            new ExternalSorter[K, Nothing, C](context,
+              dep.aggregator.asInstanceOf[Option[Aggregator[K, Nothing, C]]],
+              ordering = Some(keyOrd), serializer = dep.serializer)
+          }
+        } else {
+          new ExternalSorter[K, C, C](context, ordering = Some(keyOrd),
+            serializer = dep.serializer)
+        }
+        
sorter.insertAllAndUpdateMetrics(interruptibleIter.asInstanceOf[Iterator[(K, 
Nothing)]]).
+          asInstanceOf[Iterator[(K, Nothing)]]
+      case None =>
+        interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
+    }
+
+    val resultIter: Iterator[Product2[K, C]] = if (!aggregated && 
dep.aggregator.isDefined) {
       if (dep.mapSideCombine) {
         // We are reading values that are already combined
-        val combinedKeyValuesIterator = 
interruptibleIter.asInstanceOf[Iterator[(K, C)]]
+        val combinedKeyValuesIterator = sortedIter.asInstanceOf[Iterator[(K, 
C)]]
         dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, 
context)
       } else {
         // We don't know the value type, but also don't care -- the dependency 
*should*
         // have made sure its compatible w/ this aggregator, which will 
convert the value
         // type to the combined type C
-        val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, 
Nothing)]]
+        val keyValuesIterator = sortedIter.asInstanceOf[Iterator[(K, Nothing)]]
         dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
       }
     } else {
-      interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
-    }

Review Comment:
   We can drop the `aggregated` flag.
   
   In other words, we can make this simply:
   
   ```
   val resultIter = {
     if (dep.keyOrdering.isDefined) {
       // whatever is in dep.keyOrdering == match `Some(keyOrd: Ordering[K])` 
clause.
     } else if (dep.aggregator.isDefined) {
   
       // whatever is in "if (!aggregated && dep.aggregator.isDefined) {"
   
     } else {
       interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
     }
   }
   ```
   
   The proposed code is effectively this - but spread across two blocks tied 
through `aggregated` flag.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to