squito commented on a change in pull request #25304: 
[SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API.
URL: https://github.com/apache/spark/pull/25304#discussion_r311141394
 
 

 ##########
 File path: 
core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
 ##########
 @@ -430,54 +434,49 @@ void forceSorterToSpill() throws IOException {
    * This is only safe when the IO compression codec and serializer support 
concatenation of
    * serialized streams.
    *
+   * @param spills the spills to merge.
+   * @param mapWriter the map output writer to use for output.
    * @return the partition lengths in the merged file.
    */
-  private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File 
outputFile) throws IOException {
-    assert (spills.length >= 2);
+  private long[] mergeSpillsWithTransferTo(
+      SpillInfo[] spills,
+      ShuffleMapOutputWriter mapWriter) throws IOException {
     final int numPartitions = partitioner.numPartitions();
     final long[] partitionLengths = new long[numPartitions];
     final FileChannel[] spillInputChannels = new FileChannel[spills.length];
     final long[] spillInputChannelPositions = new long[spills.length];
-    FileChannel mergedFileOutputChannel = null;
 
     boolean threwException = true;
     try {
       for (int i = 0; i < spills.length; i++) {
         spillInputChannels[i] = new 
FileInputStream(spills[i].file).getChannel();
       }
-      // This file needs to opened in append mode in order to work around a 
Linux kernel bug that
-      // affects transferTo; see SPARK-3948 for more details.
-      mergedFileOutputChannel = new FileOutputStream(outputFile, 
true).getChannel();
-
-      long bytesWrittenToMergedFile = 0;
       for (int partition = 0; partition < numPartitions; partition++) {
-        for (int i = 0; i < spills.length; i++) {
-          final long partitionLengthInSpill = 
spills[i].partitionLengths[partition];
-          final FileChannel spillInputChannel = spillInputChannels[i];
-          final long writeStartTime = System.nanoTime();
-          Utils.copyFileStreamNIO(
-            spillInputChannel,
-            mergedFileOutputChannel,
-            spillInputChannelPositions[i],
-            partitionLengthInSpill);
-          spillInputChannelPositions[i] += partitionLengthInSpill;
-          writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
-          bytesWrittenToMergedFile += partitionLengthInSpill;
-          partitionLengths[partition] += partitionLengthInSpill;
+        boolean copyThrewExecption = true;
+        ShufflePartitionWriter writer = 
mapWriter.getPartitionWriter(partition);
+        WritableByteChannelWrapper resolvedChannel = 
writer.openChannelWrapper()
+            .orElseGet(() -> new 
StreamFallbackChannelWrapper(openStreamUnchecked(writer)));
+        try {
+          for (int i = 0; i < spills.length; i++) {
+            long partitionLengthInSpill = 0L;
+            partitionLengthInSpill += spills[i].partitionLengths[partition];
 
 Review comment:
   final long partitionLengthInSpill = spills[i].partitionLengths[partition];

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to