Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216962247 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java --- @@ -181,42 +181,43 @@ private void writeSortedFile(boolean isLastFile) { // around this, we pass a dummy no-op serializer. final SerializerInstance ser = DummySerializerInstance.INSTANCE; - final DiskBlockObjectWriter writer = - blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse); - int currentPartition = -1; - final int uaoSize = UnsafeAlignedOffset.getUaoSize(); - while (sortedRecords.hasNext()) { - sortedRecords.loadNext(); - final int partition = sortedRecords.packedRecordPointer.getPartitionId(); - assert (partition >= currentPartition); - if (partition != currentPartition) { - // Switch to the new partition - if (currentPartition != -1) { - final FileSegment fileSegment = writer.commitAndGet(); - spillInfo.partitionLengths[currentPartition] = fileSegment.length(); + final FileSegment committedSegment; + try (final DiskBlockObjectWriter writer = + blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse)) { + + final int uaoSize = UnsafeAlignedOffset.getUaoSize(); + while (sortedRecords.hasNext()) { + sortedRecords.loadNext(); + final int partition = sortedRecords.packedRecordPointer.getPartitionId(); + assert (partition >= currentPartition); + if (partition != currentPartition) { + // Switch to the new partition + if (currentPartition != -1) { + final FileSegment fileSegment = writer.commitAndGet(); + spillInfo.partitionLengths[currentPartition] = fileSegment.length(); + } + currentPartition = partition; } - currentPartition = partition; - } - final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer(); - final Object recordPage = taskMemoryManager.getPage(recordPointer); - final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer); - int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage); - long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length - while (dataRemaining > 0) { - final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); - Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); - writer.write(writeBuffer, 0, toTransfer); - recordReadPosition += toTransfer; - dataRemaining -= toTransfer; + final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer(); + final Object recordPage = taskMemoryManager.getPage(recordPointer); + final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer); + int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage); + long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length + while (dataRemaining > 0) { + final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); + Platform.copyMemory( + recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); --- End diff -- nit: fix indentation
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org