mccheah commented on a change in pull request #25304: 
[SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API.
URL: https://github.com/apache/spark/pull/25304#discussion_r321948056
 
 

 ##########
 File path: 
core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
 ##########
 @@ -420,64 +423,54 @@ void forceSorterToSpill() throws IOException {
       for (InputStream stream : spillInputStreams) {
         Closeables.close(stream, threwException);
       }
-      Closeables.close(mergedFileOutputStream, threwException);
     }
-    return partitionLengths;
   }
 
   /**
    * Merges spill files by using NIO's transferTo to concatenate spill 
partitions' bytes.
    * This is only safe when the IO compression codec and serializer support 
concatenation of
    * serialized streams.
    *
+   * @param spills the spills to merge.
+   * @param mapWriter the map output writer to use for output.
    * @return the partition lengths in the merged file.
    */
-  private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File 
outputFile) throws IOException {
-    assert (spills.length >= 2);
+  private void mergeSpillsWithTransferTo(
+      SpillInfo[] spills,
+      ShuffleMapOutputWriter mapWriter) throws IOException {
     final int numPartitions = partitioner.numPartitions();
-    final long[] partitionLengths = new long[numPartitions];
     final FileChannel[] spillInputChannels = new FileChannel[spills.length];
     final long[] spillInputChannelPositions = new long[spills.length];
-    FileChannel mergedFileOutputChannel = null;
 
     boolean threwException = true;
     try {
       for (int i = 0; i < spills.length; i++) {
         spillInputChannels[i] = new 
FileInputStream(spills[i].file).getChannel();
       }
-      // This file needs to opened in append mode in order to work around a 
Linux kernel bug that
-      // affects transferTo; see SPARK-3948 for more details.
-      mergedFileOutputChannel = new FileOutputStream(outputFile, 
true).getChannel();
-
-      long bytesWrittenToMergedFile = 0;
       for (int partition = 0; partition < numPartitions; partition++) {
-        for (int i = 0; i < spills.length; i++) {
-          final long partitionLengthInSpill = 
spills[i].partitionLengths[partition];
-          final FileChannel spillInputChannel = spillInputChannels[i];
-          final long writeStartTime = System.nanoTime();
-          Utils.copyFileStreamNIO(
-            spillInputChannel,
-            mergedFileOutputChannel,
-            spillInputChannelPositions[i],
-            partitionLengthInSpill);
-          spillInputChannelPositions[i] += partitionLengthInSpill;
-          writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
-          bytesWrittenToMergedFile += partitionLengthInSpill;
-          partitionLengths[partition] += partitionLengthInSpill;
+        boolean copyThrewExecption = true;
+        ShufflePartitionWriter writer = 
mapWriter.getPartitionWriter(partition);
+        WritableByteChannelWrapper resolvedChannel = 
writer.openChannelWrapper()
+            .orElseGet(() -> new 
StreamFallbackChannelWrapper(openStreamUnchecked(writer)));
 
 Review comment:
   The tricky part here is that you only know if the channel wrapper is 
supported when we've started looking through the partitions in this loop - 
which would mean we would have to abort iteration early and switch the merge 
strategy. Can we leave it as-is?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to