Github user heary-cao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23225#discussion_r239305307
  
    --- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
    @@ -167,64 +167,67 @@ private void writeSortedFile(boolean isLastFile) {
         // record;
         final byte[] writeBuffer = new byte[diskWriteBufferSize];
     
    -    // Because this output will be read during shuffle, its compression 
codec must be controlled by
    -    // spark.shuffle.compress instead of spark.shuffle.spill.compress, so 
we need to use
    -    // createTempShuffleBlock here; see SPARK-3426 for more details.
    -    final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
    -      blockManager.diskBlockManager().createTempShuffleBlock();
    -    final File file = spilledFileInfo._2();
    -    final TempShuffleBlockId blockId = spilledFileInfo._1();
    -    final SpillInfo spillInfo = new SpillInfo(numPartitions, file, 
blockId);
    -
    -    // Unfortunately, we need a serializer instance in order to construct 
a DiskBlockObjectWriter.
    -    // Our write path doesn't actually use this serializer (since we end 
up calling the `write()`
    -    // OutputStream methods), but DiskBlockObjectWriter still calls some 
methods on it. To work
    -    // around this, we pass a dummy no-op serializer.
    -    final SerializerInstance ser = DummySerializerInstance.INSTANCE;
    -
    -    int currentPartition = -1;
    -    final FileSegment committedSegment;
    -    try (DiskBlockObjectWriter writer =
    -        blockManager.getDiskWriter(blockId, file, ser, 
fileBufferSizeBytes, writeMetricsToUse)) {
    -
    -      final int uaoSize = UnsafeAlignedOffset.getUaoSize();
    -      while (sortedRecords.hasNext()) {
    -        sortedRecords.loadNext();
    -        final int partition = 
sortedRecords.packedRecordPointer.getPartitionId();
    -        assert (partition >= currentPartition);
    -        if (partition != currentPartition) {
    -          // Switch to the new partition
    -          if (currentPartition != -1) {
    -            final FileSegment fileSegment = writer.commitAndGet();
    -            spillInfo.partitionLengths[currentPartition] = 
fileSegment.length();
    +    // If there are no sorted records, so we don't need to create an empty 
spill file.
    +    if (sortedRecords.hasNext()) {
    --- End diff --
    
    Can you go to the function entry ahead of time to decide whether to return? 
Be similar to `BypassMergeSortShuffleWriter.write`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to