Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22399#discussion_r216962247
  
    --- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
    @@ -181,42 +181,43 @@ private void writeSortedFile(boolean isLastFile) {
         // around this, we pass a dummy no-op serializer.
         final SerializerInstance ser = DummySerializerInstance.INSTANCE;
     
    -    final DiskBlockObjectWriter writer =
    -      blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, 
writeMetricsToUse);
    -
         int currentPartition = -1;
    -    final int uaoSize = UnsafeAlignedOffset.getUaoSize();
    -    while (sortedRecords.hasNext()) {
    -      sortedRecords.loadNext();
    -      final int partition = 
sortedRecords.packedRecordPointer.getPartitionId();
    -      assert (partition >= currentPartition);
    -      if (partition != currentPartition) {
    -        // Switch to the new partition
    -        if (currentPartition != -1) {
    -          final FileSegment fileSegment = writer.commitAndGet();
    -          spillInfo.partitionLengths[currentPartition] = 
fileSegment.length();
    +    final FileSegment committedSegment;
    +    try (final DiskBlockObjectWriter writer =
    +        blockManager.getDiskWriter(blockId, file, ser, 
fileBufferSizeBytes, writeMetricsToUse)) {
    +
    +      final int uaoSize = UnsafeAlignedOffset.getUaoSize();
    +      while (sortedRecords.hasNext()) {
    +        sortedRecords.loadNext();
    +        final int partition = 
sortedRecords.packedRecordPointer.getPartitionId();
    +        assert (partition >= currentPartition);
    +        if (partition != currentPartition) {
    +          // Switch to the new partition
    +          if (currentPartition != -1) {
    +            final FileSegment fileSegment = writer.commitAndGet();
    +            spillInfo.partitionLengths[currentPartition] = 
fileSegment.length();
    +          }
    +          currentPartition = partition;
             }
    -        currentPartition = partition;
    -      }
     
    -      final long recordPointer = 
sortedRecords.packedRecordPointer.getRecordPointer();
    -      final Object recordPage = taskMemoryManager.getPage(recordPointer);
    -      final long recordOffsetInPage = 
taskMemoryManager.getOffsetInPage(recordPointer);
    -      int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, 
recordOffsetInPage);
    -      long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
    -      while (dataRemaining > 0) {
    -        final int toTransfer = Math.min(diskWriteBufferSize, 
dataRemaining);
    -        Platform.copyMemory(
    -          recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
    -        writer.write(writeBuffer, 0, toTransfer);
    -        recordReadPosition += toTransfer;
    -        dataRemaining -= toTransfer;
    +        final long recordPointer = 
sortedRecords.packedRecordPointer.getRecordPointer();
    +        final Object recordPage = taskMemoryManager.getPage(recordPointer);
    +        final long recordOffsetInPage = 
taskMemoryManager.getOffsetInPage(recordPointer);
    +        int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, 
recordOffsetInPage);
    +        long recordReadPosition = recordOffsetInPage + uaoSize; // skip 
over record length
    +        while (dataRemaining > 0) {
    +          final int toTransfer = Math.min(diskWriteBufferSize, 
dataRemaining);
    +          Platform.copyMemory(
    +                  recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
    --- End diff --
    
    nit: fix indentation


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to