xuanyuanking commented on a change in pull request #25341: 
[SPARK-28607][CORE][SHUFFLE] Don't store partition lengths twice.
URL: https://github.com/apache/spark/pull/25341#discussion_r319471170
 
 

 ##########
 File path: 
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 ##########
 @@ -195,40 +191,36 @@ public void write(Iterator<Product2<K, V>> records) 
throws IOException {
    */
   private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) 
throws IOException {
     // Track location of the partition starts in the output file
-    final long[] lengths = new long[numPartitions];
-    if (partitionWriters == null) {
-      // We were passed an empty iterator
-      return lengths;
-    }
-    final long writeStartTime = System.nanoTime();
-    try {
-      for (int i = 0; i < numPartitions; i++) {
-        final File file = partitionWriterSegments[i].file();
-        ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
-        if (file.exists()) {
-          if (transferToEnabled) {
-            // Using WritableByteChannelWrapper to make resource closing 
consistent between
-            // this implementation and UnsafeShuffleWriter.
-            Optional<WritableByteChannelWrapper> maybeOutputChannel = 
writer.openChannelWrapper();
-            if (maybeOutputChannel.isPresent()) {
-              writePartitionedDataWithChannel(file, maybeOutputChannel.get());
+    if (partitionWriters != null) {
+      final long writeStartTime = System.nanoTime();
+      try {
+        for (int i = 0; i < numPartitions; i++) {
+          final File file = partitionWriterSegments[i].file();
+          ShufflePartitionWriter writer = 
mapOutputWriter.getPartitionWriter(i);
+          if (file.exists()) {
+            if (transferToEnabled) {
+              // Using WritableByteChannelWrapper to make resource closing 
consistent between
+              // this implementation and UnsafeShuffleWriter.
+              Optional<WritableByteChannelWrapper> maybeOutputChannel = 
writer.openChannelWrapper();
+              if (maybeOutputChannel.isPresent()) {
+                writePartitionedDataWithChannel(file, 
maybeOutputChannel.get());
+              } else {
+                writePartitionedDataWithStream(file, writer);
+              }
             } else {
               writePartitionedDataWithStream(file, writer);
             }
-          } else {
-            writePartitionedDataWithStream(file, writer);
-          }
-          if (!file.delete()) {
-            logger.error("Unable to delete file for partition {}", i);
+            if (!file.delete()) {
+              logger.error("Unable to delete file for partition {}", i);
+            }
           }
         }
-        lengths[i] = writer.getNumBytesWritten();
 
 Review comment:
   Just a quick question here. So after this change, there's no place to call 
`ShufflePartitionWriter.getNumBytesWritten()`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to