http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java deleted file mode 100644 index e73ba39..0000000 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java +++ /dev/null @@ -1,479 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle.unsafe; - -import javax.annotation.Nullable; -import java.io.File; -import java.io.IOException; -import java.util.LinkedList; - -import scala.Tuple2; - -import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.spark.SparkConf; -import org.apache.spark.TaskContext; -import org.apache.spark.executor.ShuffleWriteMetrics; -import org.apache.spark.serializer.DummySerializerInstance; -import org.apache.spark.serializer.SerializerInstance; -import org.apache.spark.shuffle.ShuffleMemoryManager; -import org.apache.spark.storage.BlockManager; -import org.apache.spark.storage.DiskBlockObjectWriter; -import org.apache.spark.storage.TempShuffleBlockId; -import org.apache.spark.unsafe.Platform; -import org.apache.spark.unsafe.array.ByteArrayMethods; -import org.apache.spark.unsafe.memory.MemoryBlock; -import org.apache.spark.unsafe.memory.TaskMemoryManager; -import org.apache.spark.util.Utils; - -/** - * An external sorter that is specialized for sort-based shuffle. - * <p> - * Incoming records are appended to data pages. When all records have been inserted (or when the - * current thread's shuffle memory limit is reached), the in-memory records are sorted according to - * their partition ids (using a {@link UnsafeShuffleInMemorySorter}). The sorted records are then - * written to a single output file (or multiple files, if we've spilled). The format of the output - * files is the same as the format of the final output file written by - * {@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output partition's records are - * written as a single serialized, compressed stream that can be read with a new decompression and - * deserialization stream. - * <p> - * Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this sorter does not merge its - * spill files. Instead, this merging is performed in {@link UnsafeShuffleWriter}, which uses a - * specialized merge procedure that avoids extra serialization/deserialization. - */ -final class UnsafeShuffleExternalSorter { - - private final Logger logger = LoggerFactory.getLogger(UnsafeShuffleExternalSorter.class); - - @VisibleForTesting - static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024; - - private final int initialSize; - private final int numPartitions; - private final int pageSizeBytes; - @VisibleForTesting - final int maxRecordSizeBytes; - private final TaskMemoryManager taskMemoryManager; - private final ShuffleMemoryManager shuffleMemoryManager; - private final BlockManager blockManager; - private final TaskContext taskContext; - private final ShuffleWriteMetrics writeMetrics; - - /** The buffer size to use when writing spills using DiskBlockObjectWriter */ - private final int fileBufferSizeBytes; - - /** - * Memory pages that hold the records being sorted. The pages in this list are freed when - * spilling, although in principle we could recycle these pages across spills (on the other hand, - * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager - * itself). - */ - private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<MemoryBlock>(); - - private final LinkedList<SpillInfo> spills = new LinkedList<SpillInfo>(); - - /** Peak memory used by this sorter so far, in bytes. **/ - private long peakMemoryUsedBytes; - - // These variables are reset after spilling: - @Nullable private UnsafeShuffleInMemorySorter inMemSorter; - @Nullable private MemoryBlock currentPage = null; - private long currentPagePosition = -1; - private long freeSpaceInCurrentPage = 0; - - public UnsafeShuffleExternalSorter( - TaskMemoryManager memoryManager, - ShuffleMemoryManager shuffleMemoryManager, - BlockManager blockManager, - TaskContext taskContext, - int initialSize, - int numPartitions, - SparkConf conf, - ShuffleWriteMetrics writeMetrics) throws IOException { - this.taskMemoryManager = memoryManager; - this.shuffleMemoryManager = shuffleMemoryManager; - this.blockManager = blockManager; - this.taskContext = taskContext; - this.initialSize = initialSize; - this.peakMemoryUsedBytes = initialSize; - this.numPartitions = numPartitions; - // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided - this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; - this.pageSizeBytes = (int) Math.min( - PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, shuffleMemoryManager.pageSizeBytes()); - this.maxRecordSizeBytes = pageSizeBytes - 4; - this.writeMetrics = writeMetrics; - initializeForWriting(); - - // preserve first page to ensure that we have at least one page to work with. Otherwise, - // other operators in the same task may starve this sorter (SPARK-9709). - acquireNewPageIfNecessary(pageSizeBytes); - } - - /** - * Allocates new sort data structures. Called when creating the sorter and after each spill. - */ - private void initializeForWriting() throws IOException { - // TODO: move this sizing calculation logic into a static method of sorter: - final long memoryRequested = initialSize * 8L; - final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryRequested); - if (memoryAcquired != memoryRequested) { - shuffleMemoryManager.release(memoryAcquired); - throw new IOException("Could not acquire " + memoryRequested + " bytes of memory"); - } - - this.inMemSorter = new UnsafeShuffleInMemorySorter(initialSize); - } - - /** - * Sorts the in-memory records and writes the sorted records to an on-disk file. - * This method does not free the sort data structures. - * - * @param isLastFile if true, this indicates that we're writing the final output file and that the - * bytes written should be counted towards shuffle spill metrics rather than - * shuffle write metrics. - */ - private void writeSortedFile(boolean isLastFile) throws IOException { - - final ShuffleWriteMetrics writeMetricsToUse; - - if (isLastFile) { - // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes. - writeMetricsToUse = writeMetrics; - } else { - // We're spilling, so bytes written should be counted towards spill rather than write. - // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count - // them towards shuffle bytes written. - writeMetricsToUse = new ShuffleWriteMetrics(); - } - - // This call performs the actual sort. - final UnsafeShuffleInMemorySorter.UnsafeShuffleSorterIterator sortedRecords = - inMemSorter.getSortedIterator(); - - // Currently, we need to open a new DiskBlockObjectWriter for each partition; we can avoid this - // after SPARK-5581 is fixed. - DiskBlockObjectWriter writer; - - // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to - // be an API to directly transfer bytes from managed memory to the disk writer, we buffer - // data through a byte array. This array does not need to be large enough to hold a single - // record; - final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE]; - - // Because this output will be read during shuffle, its compression codec must be controlled by - // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use - // createTempShuffleBlock here; see SPARK-3426 for more details. - final Tuple2<TempShuffleBlockId, File> spilledFileInfo = - blockManager.diskBlockManager().createTempShuffleBlock(); - final File file = spilledFileInfo._2(); - final TempShuffleBlockId blockId = spilledFileInfo._1(); - final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId); - - // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter. - // Our write path doesn't actually use this serializer (since we end up calling the `write()` - // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work - // around this, we pass a dummy no-op serializer. - final SerializerInstance ser = DummySerializerInstance.INSTANCE; - - writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse); - - int currentPartition = -1; - while (sortedRecords.hasNext()) { - sortedRecords.loadNext(); - final int partition = sortedRecords.packedRecordPointer.getPartitionId(); - assert (partition >= currentPartition); - if (partition != currentPartition) { - // Switch to the new partition - if (currentPartition != -1) { - writer.commitAndClose(); - spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length(); - } - currentPartition = partition; - writer = - blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse); - } - - final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer(); - final Object recordPage = taskMemoryManager.getPage(recordPointer); - final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer); - int dataRemaining = Platform.getInt(recordPage, recordOffsetInPage); - long recordReadPosition = recordOffsetInPage + 4; // skip over record length - while (dataRemaining > 0) { - final int toTransfer = Math.min(DISK_WRITE_BUFFER_SIZE, dataRemaining); - Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); - writer.write(writeBuffer, 0, toTransfer); - recordReadPosition += toTransfer; - dataRemaining -= toTransfer; - } - writer.recordWritten(); - } - - if (writer != null) { - writer.commitAndClose(); - // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted, - // then the file might be empty. Note that it might be better to avoid calling - // writeSortedFile() in that case. - if (currentPartition != -1) { - spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length(); - spills.add(spillInfo); - } - } - - if (!isLastFile) { // i.e. this is a spill file - // The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records - // are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter - // relies on its `recordWritten()` method being called in order to trigger periodic updates to - // `shuffleBytesWritten`. If we were to remove the `recordWritten()` call and increment that - // counter at a higher-level, then the in-progress metrics for records written and bytes - // written would get out of sync. - // - // When writing the last file, we pass `writeMetrics` directly to the DiskBlockObjectWriter; - // in all other cases, we pass in a dummy write metrics to capture metrics, then copy those - // metrics to the true write metrics here. The reason for performing this copying is so that - // we can avoid reporting spilled bytes as shuffle write bytes. - // - // Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`. - // Consistent with ExternalSorter, we do not count this IO towards shuffle write time. - // This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this. - writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten()); - taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten()); - } - } - - /** - * Sort and spill the current records in response to memory pressure. - */ - @VisibleForTesting - void spill() throws IOException { - logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", - Thread.currentThread().getId(), - Utils.bytesToString(getMemoryUsage()), - spills.size(), - spills.size() > 1 ? " times" : " time"); - - writeSortedFile(false); - final long inMemSorterMemoryUsage = inMemSorter.getMemoryUsage(); - inMemSorter = null; - shuffleMemoryManager.release(inMemSorterMemoryUsage); - final long spillSize = freeMemory(); - taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); - - initializeForWriting(); - } - - private long getMemoryUsage() { - long totalPageSize = 0; - for (MemoryBlock page : allocatedPages) { - totalPageSize += page.size(); - } - return ((inMemSorter == null) ? 0 : inMemSorter.getMemoryUsage()) + totalPageSize; - } - - private void updatePeakMemoryUsed() { - long mem = getMemoryUsage(); - if (mem > peakMemoryUsedBytes) { - peakMemoryUsedBytes = mem; - } - } - - /** - * Return the peak memory used so far, in bytes. - */ - long getPeakMemoryUsedBytes() { - updatePeakMemoryUsed(); - return peakMemoryUsedBytes; - } - - private long freeMemory() { - updatePeakMemoryUsed(); - long memoryFreed = 0; - for (MemoryBlock block : allocatedPages) { - taskMemoryManager.freePage(block); - shuffleMemoryManager.release(block.size()); - memoryFreed += block.size(); - } - allocatedPages.clear(); - currentPage = null; - currentPagePosition = -1; - freeSpaceInCurrentPage = 0; - return memoryFreed; - } - - /** - * Force all memory and spill files to be deleted; called by shuffle error-handling code. - */ - public void cleanupResources() { - freeMemory(); - for (SpillInfo spill : spills) { - if (spill.file.exists() && !spill.file.delete()) { - logger.error("Unable to delete spill file {}", spill.file.getPath()); - } - } - if (inMemSorter != null) { - shuffleMemoryManager.release(inMemSorter.getMemoryUsage()); - inMemSorter = null; - } - } - - /** - * Checks whether there is enough space to insert an additional record in to the sort pointer - * array and grows the array if additional space is required. If the required space cannot be - * obtained, then the in-memory data will be spilled to disk. - */ - private void growPointerArrayIfNecessary() throws IOException { - assert(inMemSorter != null); - if (!inMemSorter.hasSpaceForAnotherRecord()) { - logger.debug("Attempting to expand sort pointer array"); - final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage(); - final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2; - final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryToGrowPointerArray); - if (memoryAcquired < memoryToGrowPointerArray) { - shuffleMemoryManager.release(memoryAcquired); - spill(); - } else { - inMemSorter.expandPointerArray(); - shuffleMemoryManager.release(oldPointerArrayMemoryUsage); - } - } - } - - /** - * Allocates more memory in order to insert an additional record. This will request additional - * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be - * obtained. - * - * @param requiredSpace the required space in the data page, in bytes, including space for storing - * the record size. This must be less than or equal to the page size (records - * that exceed the page size are handled via a different code path which uses - * special overflow pages). - */ - private void acquireNewPageIfNecessary(int requiredSpace) throws IOException { - growPointerArrayIfNecessary(); - if (requiredSpace > freeSpaceInCurrentPage) { - logger.trace("Required space {} is less than free space in current page ({})", requiredSpace, - freeSpaceInCurrentPage); - // TODO: we should track metrics on the amount of space wasted when we roll over to a new page - // without using the free space at the end of the current page. We should also do this for - // BytesToBytesMap. - if (requiredSpace > pageSizeBytes) { - throw new IOException("Required space " + requiredSpace + " is greater than page size (" + - pageSizeBytes + ")"); - } else { - final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes); - if (memoryAcquired < pageSizeBytes) { - shuffleMemoryManager.release(memoryAcquired); - spill(); - final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes); - if (memoryAcquiredAfterSpilling != pageSizeBytes) { - shuffleMemoryManager.release(memoryAcquiredAfterSpilling); - throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory"); - } - } - currentPage = taskMemoryManager.allocatePage(pageSizeBytes); - currentPagePosition = currentPage.getBaseOffset(); - freeSpaceInCurrentPage = pageSizeBytes; - allocatedPages.add(currentPage); - } - } - } - - /** - * Write a record to the shuffle sorter. - */ - public void insertRecord( - Object recordBaseObject, - long recordBaseOffset, - int lengthInBytes, - int partitionId) throws IOException { - - growPointerArrayIfNecessary(); - // Need 4 bytes to store the record length. - final int totalSpaceRequired = lengthInBytes + 4; - - // --- Figure out where to insert the new record ---------------------------------------------- - - final MemoryBlock dataPage; - long dataPagePosition; - boolean useOverflowPage = totalSpaceRequired > pageSizeBytes; - if (useOverflowPage) { - long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired); - // The record is larger than the page size, so allocate a special overflow page just to hold - // that record. - final long memoryGranted = shuffleMemoryManager.tryToAcquire(overflowPageSize); - if (memoryGranted != overflowPageSize) { - shuffleMemoryManager.release(memoryGranted); - spill(); - final long memoryGrantedAfterSpill = shuffleMemoryManager.tryToAcquire(overflowPageSize); - if (memoryGrantedAfterSpill != overflowPageSize) { - shuffleMemoryManager.release(memoryGrantedAfterSpill); - throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory"); - } - } - MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize); - allocatedPages.add(overflowPage); - dataPage = overflowPage; - dataPagePosition = overflowPage.getBaseOffset(); - } else { - // The record is small enough to fit in a regular data page, but the current page might not - // have enough space to hold it (or no pages have been allocated yet). - acquireNewPageIfNecessary(totalSpaceRequired); - dataPage = currentPage; - dataPagePosition = currentPagePosition; - // Update bookkeeping information - freeSpaceInCurrentPage -= totalSpaceRequired; - currentPagePosition += totalSpaceRequired; - } - final Object dataPageBaseObject = dataPage.getBaseObject(); - - final long recordAddress = - taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition); - Platform.putInt(dataPageBaseObject, dataPagePosition, lengthInBytes); - dataPagePosition += 4; - Platform.copyMemory( - recordBaseObject, recordBaseOffset, dataPageBaseObject, dataPagePosition, lengthInBytes); - assert(inMemSorter != null); - inMemSorter.insertRecord(recordAddress, partitionId); - } - - /** - * Close the sorter, causing any buffered data to be sorted and written out to disk. - * - * @return metadata for the spill files written by this sorter. If no records were ever inserted - * into this sorter, then this will return an empty array. - * @throws IOException - */ - public SpillInfo[] closeAndGetSpills() throws IOException { - try { - if (inMemSorter != null) { - // Do not count the final file towards the spill count. - writeSortedFile(true); - freeMemory(); - } - return spills.toArray(new SpillInfo[spills.size()]); - } catch (IOException e) { - cleanupResources(); - throw e; - } - } - -}
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleInMemorySorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleInMemorySorter.java deleted file mode 100644 index 5bab501..0000000 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleInMemorySorter.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle.unsafe; - -import java.util.Comparator; - -import org.apache.spark.util.collection.Sorter; - -final class UnsafeShuffleInMemorySorter { - - private final Sorter<PackedRecordPointer, long[]> sorter; - private static final class SortComparator implements Comparator<PackedRecordPointer> { - @Override - public int compare(PackedRecordPointer left, PackedRecordPointer right) { - return left.getPartitionId() - right.getPartitionId(); - } - } - private static final SortComparator SORT_COMPARATOR = new SortComparator(); - - /** - * An array of record pointers and partition ids that have been encoded by - * {@link PackedRecordPointer}. The sort operates on this array instead of directly manipulating - * records. - */ - private long[] pointerArray; - - /** - * The position in the pointer array where new records can be inserted. - */ - private int pointerArrayInsertPosition = 0; - - public UnsafeShuffleInMemorySorter(int initialSize) { - assert (initialSize > 0); - this.pointerArray = new long[initialSize]; - this.sorter = new Sorter<PackedRecordPointer, long[]>(UnsafeShuffleSortDataFormat.INSTANCE); - } - - public void expandPointerArray() { - final long[] oldArray = pointerArray; - // Guard against overflow: - final int newLength = oldArray.length * 2 > 0 ? (oldArray.length * 2) : Integer.MAX_VALUE; - pointerArray = new long[newLength]; - System.arraycopy(oldArray, 0, pointerArray, 0, oldArray.length); - } - - public boolean hasSpaceForAnotherRecord() { - return pointerArrayInsertPosition + 1 < pointerArray.length; - } - - public long getMemoryUsage() { - return pointerArray.length * 8L; - } - - /** - * Inserts a record to be sorted. - * - * @param recordPointer a pointer to the record, encoded by the task memory manager. Due to - * certain pointer compression techniques used by the sorter, the sort can - * only operate on pointers that point to locations in the first - * {@link PackedRecordPointer#MAXIMUM_PAGE_SIZE_BYTES} bytes of a data page. - * @param partitionId the partition id, which must be less than or equal to - * {@link PackedRecordPointer#MAXIMUM_PARTITION_ID}. - */ - public void insertRecord(long recordPointer, int partitionId) { - if (!hasSpaceForAnotherRecord()) { - if (pointerArray.length == Integer.MAX_VALUE) { - throw new IllegalStateException("Sort pointer array has reached maximum size"); - } else { - expandPointerArray(); - } - } - pointerArray[pointerArrayInsertPosition] = - PackedRecordPointer.packPointer(recordPointer, partitionId); - pointerArrayInsertPosition++; - } - - /** - * An iterator-like class that's used instead of Java's Iterator in order to facilitate inlining. - */ - public static final class UnsafeShuffleSorterIterator { - - private final long[] pointerArray; - private final int numRecords; - final PackedRecordPointer packedRecordPointer = new PackedRecordPointer(); - private int position = 0; - - public UnsafeShuffleSorterIterator(int numRecords, long[] pointerArray) { - this.numRecords = numRecords; - this.pointerArray = pointerArray; - } - - public boolean hasNext() { - return position < numRecords; - } - - public void loadNext() { - packedRecordPointer.set(pointerArray[position]); - position++; - } - } - - /** - * Return an iterator over record pointers in sorted order. - */ - public UnsafeShuffleSorterIterator getSortedIterator() { - sorter.sort(pointerArray, 0, pointerArrayInsertPosition, SORT_COMPARATOR); - return new UnsafeShuffleSorterIterator(pointerArrayInsertPosition, pointerArray); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java deleted file mode 100644 index a66d74e..0000000 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle.unsafe; - -import org.apache.spark.util.collection.SortDataFormat; - -final class UnsafeShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, long[]> { - - public static final UnsafeShuffleSortDataFormat INSTANCE = new UnsafeShuffleSortDataFormat(); - - private UnsafeShuffleSortDataFormat() { } - - @Override - public PackedRecordPointer getKey(long[] data, int pos) { - // Since we re-use keys, this method shouldn't be called. - throw new UnsupportedOperationException(); - } - - @Override - public PackedRecordPointer newKey() { - return new PackedRecordPointer(); - } - - @Override - public PackedRecordPointer getKey(long[] data, int pos, PackedRecordPointer reuse) { - reuse.set(data[pos]); - return reuse; - } - - @Override - public void swap(long[] data, int pos0, int pos1) { - final long temp = data[pos0]; - data[pos0] = data[pos1]; - data[pos1] = temp; - } - - @Override - public void copyElement(long[] src, int srcPos, long[] dst, int dstPos) { - dst[dstPos] = src[srcPos]; - } - - @Override - public void copyRange(long[] src, int srcPos, long[] dst, int dstPos, int length) { - System.arraycopy(src, srcPos, dst, dstPos, length); - } - - @Override - public long[] allocate(int length) { - return new long[length]; - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java deleted file mode 100644 index fdb309e..0000000 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java +++ /dev/null @@ -1,489 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle.unsafe; - -import javax.annotation.Nullable; -import java.io.*; -import java.nio.channels.FileChannel; -import java.util.Iterator; - -import scala.Option; -import scala.Product2; -import scala.collection.JavaConverters; -import scala.collection.immutable.Map; -import scala.reflect.ClassTag; -import scala.reflect.ClassTag$; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.ByteStreams; -import com.google.common.io.Closeables; -import com.google.common.io.Files; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.spark.*; -import org.apache.spark.annotation.Private; -import org.apache.spark.executor.ShuffleWriteMetrics; -import org.apache.spark.io.CompressionCodec; -import org.apache.spark.io.CompressionCodec$; -import org.apache.spark.io.LZFCompressionCodec; -import org.apache.spark.network.util.LimitedInputStream; -import org.apache.spark.scheduler.MapStatus; -import org.apache.spark.scheduler.MapStatus$; -import org.apache.spark.serializer.SerializationStream; -import org.apache.spark.serializer.Serializer; -import org.apache.spark.serializer.SerializerInstance; -import org.apache.spark.shuffle.IndexShuffleBlockResolver; -import org.apache.spark.shuffle.ShuffleMemoryManager; -import org.apache.spark.shuffle.ShuffleWriter; -import org.apache.spark.storage.BlockManager; -import org.apache.spark.storage.TimeTrackingOutputStream; -import org.apache.spark.unsafe.Platform; -import org.apache.spark.unsafe.memory.TaskMemoryManager; - -@Private -public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { - - private final Logger logger = LoggerFactory.getLogger(UnsafeShuffleWriter.class); - - private static final ClassTag<Object> OBJECT_CLASS_TAG = ClassTag$.MODULE$.Object(); - - @VisibleForTesting - static final int INITIAL_SORT_BUFFER_SIZE = 4096; - - private final BlockManager blockManager; - private final IndexShuffleBlockResolver shuffleBlockResolver; - private final TaskMemoryManager memoryManager; - private final ShuffleMemoryManager shuffleMemoryManager; - private final SerializerInstance serializer; - private final Partitioner partitioner; - private final ShuffleWriteMetrics writeMetrics; - private final int shuffleId; - private final int mapId; - private final TaskContext taskContext; - private final SparkConf sparkConf; - private final boolean transferToEnabled; - - @Nullable private MapStatus mapStatus; - @Nullable private UnsafeShuffleExternalSorter sorter; - private long peakMemoryUsedBytes = 0; - - /** Subclass of ByteArrayOutputStream that exposes `buf` directly. */ - private static final class MyByteArrayOutputStream extends ByteArrayOutputStream { - public MyByteArrayOutputStream(int size) { super(size); } - public byte[] getBuf() { return buf; } - } - - private MyByteArrayOutputStream serBuffer; - private SerializationStream serOutputStream; - - /** - * Are we in the process of stopping? Because map tasks can call stop() with success = true - * and then call stop() with success = false if they get an exception, we want to make sure - * we don't try deleting files, etc twice. - */ - private boolean stopping = false; - - public UnsafeShuffleWriter( - BlockManager blockManager, - IndexShuffleBlockResolver shuffleBlockResolver, - TaskMemoryManager memoryManager, - ShuffleMemoryManager shuffleMemoryManager, - UnsafeShuffleHandle<K, V> handle, - int mapId, - TaskContext taskContext, - SparkConf sparkConf) throws IOException { - final int numPartitions = handle.dependency().partitioner().numPartitions(); - if (numPartitions > UnsafeShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS()) { - throw new IllegalArgumentException( - "UnsafeShuffleWriter can only be used for shuffles with at most " + - UnsafeShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS() + " reduce partitions"); - } - this.blockManager = blockManager; - this.shuffleBlockResolver = shuffleBlockResolver; - this.memoryManager = memoryManager; - this.shuffleMemoryManager = shuffleMemoryManager; - this.mapId = mapId; - final ShuffleDependency<K, V, V> dep = handle.dependency(); - this.shuffleId = dep.shuffleId(); - this.serializer = Serializer.getSerializer(dep.serializer()).newInstance(); - this.partitioner = dep.partitioner(); - this.writeMetrics = new ShuffleWriteMetrics(); - taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics)); - this.taskContext = taskContext; - this.sparkConf = sparkConf; - this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true); - open(); - } - - @VisibleForTesting - public int maxRecordSizeBytes() { - assert(sorter != null); - return sorter.maxRecordSizeBytes; - } - - private void updatePeakMemoryUsed() { - // sorter can be null if this writer is closed - if (sorter != null) { - long mem = sorter.getPeakMemoryUsedBytes(); - if (mem > peakMemoryUsedBytes) { - peakMemoryUsedBytes = mem; - } - } - } - - /** - * Return the peak memory used so far, in bytes. - */ - public long getPeakMemoryUsedBytes() { - updatePeakMemoryUsed(); - return peakMemoryUsedBytes; - } - - /** - * This convenience method should only be called in test code. - */ - @VisibleForTesting - public void write(Iterator<Product2<K, V>> records) throws IOException { - write(JavaConverters.asScalaIteratorConverter(records).asScala()); - } - - @Override - public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException { - // Keep track of success so we know if we encountered an exception - // We do this rather than a standard try/catch/re-throw to handle - // generic throwables. - boolean success = false; - try { - while (records.hasNext()) { - insertRecordIntoSorter(records.next()); - } - closeAndWriteOutput(); - success = true; - } finally { - if (sorter != null) { - try { - sorter.cleanupResources(); - } catch (Exception e) { - // Only throw this error if we won't be masking another - // error. - if (success) { - throw e; - } else { - logger.error("In addition to a failure during writing, we failed during " + - "cleanup.", e); - } - } - } - } - } - - private void open() throws IOException { - assert (sorter == null); - sorter = new UnsafeShuffleExternalSorter( - memoryManager, - shuffleMemoryManager, - blockManager, - taskContext, - INITIAL_SORT_BUFFER_SIZE, - partitioner.numPartitions(), - sparkConf, - writeMetrics); - serBuffer = new MyByteArrayOutputStream(1024 * 1024); - serOutputStream = serializer.serializeStream(serBuffer); - } - - @VisibleForTesting - void closeAndWriteOutput() throws IOException { - assert(sorter != null); - updatePeakMemoryUsed(); - serBuffer = null; - serOutputStream = null; - final SpillInfo[] spills = sorter.closeAndGetSpills(); - sorter = null; - final long[] partitionLengths; - try { - partitionLengths = mergeSpills(spills); - } finally { - for (SpillInfo spill : spills) { - if (spill.file.exists() && ! spill.file.delete()) { - logger.error("Error while deleting spill file {}", spill.file.getPath()); - } - } - } - shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths); - mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); - } - - @VisibleForTesting - void insertRecordIntoSorter(Product2<K, V> record) throws IOException { - assert(sorter != null); - final K key = record._1(); - final int partitionId = partitioner.getPartition(key); - serBuffer.reset(); - serOutputStream.writeKey(key, OBJECT_CLASS_TAG); - serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG); - serOutputStream.flush(); - - final int serializedRecordSize = serBuffer.size(); - assert (serializedRecordSize > 0); - - sorter.insertRecord( - serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId); - } - - @VisibleForTesting - void forceSorterToSpill() throws IOException { - assert (sorter != null); - sorter.spill(); - } - - /** - * Merge zero or more spill files together, choosing the fastest merging strategy based on the - * number of spills and the IO compression codec. - * - * @return the partition lengths in the merged file. - */ - private long[] mergeSpills(SpillInfo[] spills) throws IOException { - final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId); - final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true); - final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf); - final boolean fastMergeEnabled = - sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true); - final boolean fastMergeIsSupported = - !compressionEnabled || compressionCodec instanceof LZFCompressionCodec; - try { - if (spills.length == 0) { - new FileOutputStream(outputFile).close(); // Create an empty file - return new long[partitioner.numPartitions()]; - } else if (spills.length == 1) { - // Here, we don't need to perform any metrics updates because the bytes written to this - // output file would have already been counted as shuffle bytes written. - Files.move(spills[0].file, outputFile); - return spills[0].partitionLengths; - } else { - final long[] partitionLengths; - // There are multiple spills to merge, so none of these spill files' lengths were counted - // towards our shuffle write count or shuffle write time. If we use the slow merge path, - // then the final output file's size won't necessarily be equal to the sum of the spill - // files' sizes. To guard against this case, we look at the output file's actual size when - // computing shuffle bytes written. - // - // We allow the individual merge methods to report their own IO times since different merge - // strategies use different IO techniques. We count IO during merge towards the shuffle - // shuffle write time, which appears to be consistent with the "not bypassing merge-sort" - // branch in ExternalSorter. - if (fastMergeEnabled && fastMergeIsSupported) { - // Compression is disabled or we are using an IO compression codec that supports - // decompression of concatenated compressed streams, so we can perform a fast spill merge - // that doesn't need to interpret the spilled bytes. - if (transferToEnabled) { - logger.debug("Using transferTo-based fast merge"); - partitionLengths = mergeSpillsWithTransferTo(spills, outputFile); - } else { - logger.debug("Using fileStream-based fast merge"); - partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null); - } - } else { - logger.debug("Using slow merge"); - partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec); - } - // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has - // in-memory records, we write out the in-memory records to a file but do not count that - // final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs - // to be counted as shuffle write, but this will lead to double-counting of the final - // SpillInfo's bytes. - writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length()); - writeMetrics.incShuffleBytesWritten(outputFile.length()); - return partitionLengths; - } - } catch (IOException e) { - if (outputFile.exists() && !outputFile.delete()) { - logger.error("Unable to delete output file {}", outputFile.getPath()); - } - throw e; - } - } - - /** - * Merges spill files using Java FileStreams. This code path is slower than the NIO-based merge, - * {@link UnsafeShuffleWriter#mergeSpillsWithTransferTo(SpillInfo[], File)}, so it's only used in - * cases where the IO compression codec does not support concatenation of compressed data, or in - * cases where users have explicitly disabled use of {@code transferTo} in order to work around - * kernel bugs. - * - * @param spills the spills to merge. - * @param outputFile the file to write the merged data to. - * @param compressionCodec the IO compression codec, or null if shuffle compression is disabled. - * @return the partition lengths in the merged file. - */ - private long[] mergeSpillsWithFileStream( - SpillInfo[] spills, - File outputFile, - @Nullable CompressionCodec compressionCodec) throws IOException { - assert (spills.length >= 2); - final int numPartitions = partitioner.numPartitions(); - final long[] partitionLengths = new long[numPartitions]; - final InputStream[] spillInputStreams = new FileInputStream[spills.length]; - OutputStream mergedFileOutputStream = null; - - boolean threwException = true; - try { - for (int i = 0; i < spills.length; i++) { - spillInputStreams[i] = new FileInputStream(spills[i].file); - } - for (int partition = 0; partition < numPartitions; partition++) { - final long initialFileLength = outputFile.length(); - mergedFileOutputStream = - new TimeTrackingOutputStream(writeMetrics, new FileOutputStream(outputFile, true)); - if (compressionCodec != null) { - mergedFileOutputStream = compressionCodec.compressedOutputStream(mergedFileOutputStream); - } - - for (int i = 0; i < spills.length; i++) { - final long partitionLengthInSpill = spills[i].partitionLengths[partition]; - if (partitionLengthInSpill > 0) { - InputStream partitionInputStream = - new LimitedInputStream(spillInputStreams[i], partitionLengthInSpill); - if (compressionCodec != null) { - partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream); - } - ByteStreams.copy(partitionInputStream, mergedFileOutputStream); - } - } - mergedFileOutputStream.flush(); - mergedFileOutputStream.close(); - partitionLengths[partition] = (outputFile.length() - initialFileLength); - } - threwException = false; - } finally { - // To avoid masking exceptions that caused us to prematurely enter the finally block, only - // throw exceptions during cleanup if threwException == false. - for (InputStream stream : spillInputStreams) { - Closeables.close(stream, threwException); - } - Closeables.close(mergedFileOutputStream, threwException); - } - return partitionLengths; - } - - /** - * Merges spill files by using NIO's transferTo to concatenate spill partitions' bytes. - * This is only safe when the IO compression codec and serializer support concatenation of - * serialized streams. - * - * @return the partition lengths in the merged file. - */ - private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) throws IOException { - assert (spills.length >= 2); - final int numPartitions = partitioner.numPartitions(); - final long[] partitionLengths = new long[numPartitions]; - final FileChannel[] spillInputChannels = new FileChannel[spills.length]; - final long[] spillInputChannelPositions = new long[spills.length]; - FileChannel mergedFileOutputChannel = null; - - boolean threwException = true; - try { - for (int i = 0; i < spills.length; i++) { - spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel(); - } - // This file needs to opened in append mode in order to work around a Linux kernel bug that - // affects transferTo; see SPARK-3948 for more details. - mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel(); - - long bytesWrittenToMergedFile = 0; - for (int partition = 0; partition < numPartitions; partition++) { - for (int i = 0; i < spills.length; i++) { - final long partitionLengthInSpill = spills[i].partitionLengths[partition]; - long bytesToTransfer = partitionLengthInSpill; - final FileChannel spillInputChannel = spillInputChannels[i]; - final long writeStartTime = System.nanoTime(); - while (bytesToTransfer > 0) { - final long actualBytesTransferred = spillInputChannel.transferTo( - spillInputChannelPositions[i], - bytesToTransfer, - mergedFileOutputChannel); - spillInputChannelPositions[i] += actualBytesTransferred; - bytesToTransfer -= actualBytesTransferred; - } - writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime); - bytesWrittenToMergedFile += partitionLengthInSpill; - partitionLengths[partition] += partitionLengthInSpill; - } - } - // Check the position after transferTo loop to see if it is in the right position and raise an - // exception if it is incorrect. The position will not be increased to the expected length - // after calling transferTo in kernel version 2.6.32. This issue is described at - // https://bugs.openjdk.java.net/browse/JDK-7052359 and SPARK-3948. - if (mergedFileOutputChannel.position() != bytesWrittenToMergedFile) { - throw new IOException( - "Current position " + mergedFileOutputChannel.position() + " does not equal expected " + - "position " + bytesWrittenToMergedFile + " after transferTo. Please check your kernel" + - " version to see if it is 2.6.32, as there is a kernel bug which will lead to " + - "unexpected behavior when using transferTo. You can set spark.file.transferTo=false " + - "to disable this NIO feature." - ); - } - threwException = false; - } finally { - // To avoid masking exceptions that caused us to prematurely enter the finally block, only - // throw exceptions during cleanup if threwException == false. - for (int i = 0; i < spills.length; i++) { - assert(spillInputChannelPositions[i] == spills[i].file.length()); - Closeables.close(spillInputChannels[i], threwException); - } - Closeables.close(mergedFileOutputChannel, threwException); - } - return partitionLengths; - } - - @Override - public Option<MapStatus> stop(boolean success) { - try { - // Update task metrics from accumulators (null in UnsafeShuffleWriterSuite) - Map<String, Accumulator<Object>> internalAccumulators = - taskContext.internalMetricsToAccumulators(); - if (internalAccumulators != null) { - internalAccumulators.apply(InternalAccumulator.PEAK_EXECUTION_MEMORY()) - .add(getPeakMemoryUsedBytes()); - } - - if (stopping) { - return Option.apply(null); - } else { - stopping = true; - if (success) { - if (mapStatus == null) { - throw new IllegalStateException("Cannot call stop(true) without having called write()"); - } - return Option.apply(mapStatus); - } else { - // The map task failed, so delete our output data. - shuffleBlockResolver.removeDataByMap(shuffleId, mapId); - return Option.apply(null); - } - } - } finally { - if (sorter != null) { - // If sorter is non-null, then this implies that we called stop() in response to an error, - // so we need to clean up memory and spill files created by the sorter - sorter.cleanupResources(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/scala/org/apache/spark/SparkEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index c329983..704158b 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -330,7 +330,7 @@ object SparkEnv extends Logging { val shortShuffleMgrNames = Map( "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager", - "tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager") + "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager") val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 9df4e55..1105167 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -19,9 +19,53 @@ package org.apache.spark.shuffle.sort import java.util.concurrent.ConcurrentHashMap -import org.apache.spark.{Logging, SparkConf, TaskContext, ShuffleDependency} +import org.apache.spark._ +import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle._ +/** + * In sort-based shuffle, incoming records are sorted according to their target partition ids, then + * written to a single map output file. Reducers fetch contiguous regions of this file in order to + * read their portion of the map output. In cases where the map output data is too large to fit in + * memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged + * to produce the final output file. + * + * Sort-based shuffle has two different write paths for producing its map output files: + * + * - Serialized sorting: used when all three of the following conditions hold: + * 1. The shuffle dependency specifies no aggregation or output ordering. + * 2. The shuffle serializer supports relocation of serialized values (this is currently + * supported by KryoSerializer and Spark SQL's custom serializers). + * 3. The shuffle produces fewer than 16777216 output partitions. + * - Deserialized sorting: used to handle all other cases. + * + * ----------------------- + * Serialized sorting mode + * ----------------------- + * + * In the serialized sorting mode, incoming records are serialized as soon as they are passed to the + * shuffle writer and are buffered in a serialized form during sorting. This write path implements + * several optimizations: + * + * - Its sort operates on serialized binary data rather than Java objects, which reduces memory + * consumption and GC overheads. This optimization requires the record serializer to have certain + * properties to allow serialized records to be re-ordered without requiring deserialization. + * See SPARK-4550, where this optimization was first proposed and implemented, for more details. + * + * - It uses a specialized cache-efficient sorter ([[ShuffleExternalSorter]]) that sorts + * arrays of compressed record pointers and partition ids. By using only 8 bytes of space per + * record in the sorting array, this fits more of the array into cache. + * + * - The spill merging procedure operates on blocks of serialized records that belong to the same + * partition and does not need to deserialize records during the merge. + * + * - When the spill compression codec supports concatenation of compressed data, the spill merge + * simply concatenates the serialized and compressed spill partitions to produce the final output + * partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used + * and avoids the need to allocate decompression or copying buffers during the merge. + * + * For more details on these optimizations, see SPARK-7081. + */ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { if (!conf.getBoolean("spark.shuffle.spill", true)) { @@ -30,8 +74,12 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager " Shuffle will continue to spill to disk when necessary.") } - private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf) - private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]() + /** + * A mapping from shuffle ids to the number of mappers producing output for those shuffles. + */ + private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]() + + override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf) /** * Register a shuffle with the manager and obtain a handle for it to pass to tasks. @@ -40,7 +88,22 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager shuffleId: Int, numMaps: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { - new BaseShuffleHandle(shuffleId, numMaps, dependency) + if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) { + // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't + // need map-side aggregation, then write numPartitions files directly and just concatenate + // them at the end. This avoids doing serialization and deserialization twice to merge + // together the spilled files, which would happen with the normal code path. The downside is + // having multiple files open at a time and thus more memory allocated to buffers. + new BypassMergeSortShuffleHandle[K, V]( + shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) + } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { + // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient: + new SerializedShuffleHandle[K, V]( + shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) + } else { + // Otherwise, buffer map outputs in a deserialized form: + new BaseShuffleHandle(shuffleId, numMaps, dependency) + } } /** @@ -52,38 +115,114 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager startPartition: Int, endPartition: Int, context: TaskContext): ShuffleReader[K, C] = { - // We currently use the same block store shuffle fetcher as the hash-based shuffle. new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) } /** Get a writer for a given partition. Called on executors by map tasks. */ - override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) - : ShuffleWriter[K, V] = { - val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]] - shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, baseShuffleHandle.numMaps) - new SortShuffleWriter( - shuffleBlockResolver, baseShuffleHandle, mapId, context) + override def getWriter[K, V]( + handle: ShuffleHandle, + mapId: Int, + context: TaskContext): ShuffleWriter[K, V] = { + numMapsForShuffle.putIfAbsent( + handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) + val env = SparkEnv.get + handle match { + case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => + new UnsafeShuffleWriter( + env.blockManager, + shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], + context.taskMemoryManager(), + env.shuffleMemoryManager, + unsafeShuffleHandle, + mapId, + context, + env.conf) + case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => + new BypassMergeSortShuffleWriter( + env.blockManager, + shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], + bypassMergeSortHandle, + mapId, + context, + env.conf) + case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => + new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) + } } /** Remove a shuffle's metadata from the ShuffleManager. */ override def unregisterShuffle(shuffleId: Int): Boolean = { - if (shuffleMapNumber.containsKey(shuffleId)) { - val numMaps = shuffleMapNumber.remove(shuffleId) - (0 until numMaps).map{ mapId => + Option(numMapsForShuffle.remove(shuffleId)).foreach { numMaps => + (0 until numMaps).foreach { mapId => shuffleBlockResolver.removeDataByMap(shuffleId, mapId) } } true } - override val shuffleBlockResolver: IndexShuffleBlockResolver = { - indexShuffleBlockResolver - } - /** Shut down this ShuffleManager. */ override def stop(): Unit = { shuffleBlockResolver.stop() } } + +private[spark] object SortShuffleManager extends Logging { + + /** + * The maximum number of shuffle output partitions that SortShuffleManager supports when + * buffering map outputs in a serialized form. This is an extreme defensive programming measure, + * since it's extremely unlikely that a single shuffle produces over 16 million output partitions. + * */ + val MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE = + PackedRecordPointer.MAXIMUM_PARTITION_ID + 1 + + /** + * Helper method for determining whether a shuffle should use an optimized serialized shuffle + * path or whether it should fall back to the original path that operates on deserialized objects. + */ + def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = { + val shufId = dependency.shuffleId + val numPartitions = dependency.partitioner.numPartitions + val serializer = Serializer.getSerializer(dependency.serializer) + if (!serializer.supportsRelocationOfSerializedObjects) { + log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " + + s"${serializer.getClass.getName}, does not support object relocation") + false + } else if (dependency.aggregator.isDefined) { + log.debug( + s"Can't use serialized shuffle for shuffle $shufId because an aggregator is defined") + false + } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { + log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " + + s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions") + false + } else { + log.debug(s"Can use serialized shuffle for shuffle $shufId") + true + } + } +} + +/** + * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the + * serialized shuffle. + */ +private[spark] class SerializedShuffleHandle[K, V]( + shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, V]) + extends BaseShuffleHandle(shuffleId, numMaps, dependency) { +} + +/** + * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the + * bypass merge sort shuffle path. + */ +private[spark] class BypassMergeSortShuffleHandle[K, V]( + shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, V]) + extends BaseShuffleHandle(shuffleId, numMaps, dependency) { +} http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 5865e76..bbd9c1a 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -20,7 +20,6 @@ package org.apache.spark.shuffle.sort import org.apache.spark._ import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.scheduler.MapStatus -import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle} import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.collection.ExternalSorter @@ -36,7 +35,7 @@ private[spark] class SortShuffleWriter[K, V, C]( private val blockManager = SparkEnv.get.blockManager - private var sorter: SortShuffleFileWriter[K, V] = null + private var sorter: ExternalSorter[K, V, _] = null // Are we in the process of stopping? Because map tasks can call stop() with success = true // and then call stop() with success = false if they get an exception, we want to make sure @@ -54,15 +53,6 @@ private[spark] class SortShuffleWriter[K, V, C]( require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C]( dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) - } else if (SortShuffleWriter.shouldBypassMergeSort( - SparkEnv.get.conf, dep.partitioner.numPartitions, aggregator = None, keyOrdering = None)) { - // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't - // need local aggregation and sorting, write numPartitions files directly and just concatenate - // them at the end. This avoids doing serialization and deserialization twice to merge - // together the spilled files, which would happen with the normal code path. The downside is - // having multiple files open at a time and thus more memory allocated to buffers. - new BypassMergeSortShuffleWriter[K, V](SparkEnv.get.conf, blockManager, dep.partitioner, - writeMetrics, Serializer.getSerializer(dep.serializer)) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side @@ -111,12 +101,14 @@ private[spark] class SortShuffleWriter[K, V, C]( } private[spark] object SortShuffleWriter { - def shouldBypassMergeSort( - conf: SparkConf, - numPartitions: Int, - aggregator: Option[Aggregator[_, _, _]], - keyOrdering: Option[Ordering[_]]): Boolean = { - val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) - numPartitions <= bypassMergeThreshold && aggregator.isEmpty && keyOrdering.isEmpty + def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = { + // We cannot bypass sorting if we need to do map-side aggregation. + if (dep.mapSideCombine) { + require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") + false + } else { + val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) + dep.partitioner.numPartitions <= bypassMergeThreshold + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala deleted file mode 100644 index 75f22f6..0000000 --- a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle.unsafe - -import java.util.Collections -import java.util.concurrent.ConcurrentHashMap - -import org.apache.spark._ -import org.apache.spark.serializer.Serializer -import org.apache.spark.shuffle._ -import org.apache.spark.shuffle.sort.SortShuffleManager - -/** - * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the new shuffle. - */ -private[spark] class UnsafeShuffleHandle[K, V]( - shuffleId: Int, - numMaps: Int, - dependency: ShuffleDependency[K, V, V]) - extends BaseShuffleHandle(shuffleId, numMaps, dependency) { -} - -private[spark] object UnsafeShuffleManager extends Logging { - - /** - * The maximum number of shuffle output partitions that UnsafeShuffleManager supports. - */ - val MAX_SHUFFLE_OUTPUT_PARTITIONS = PackedRecordPointer.MAXIMUM_PARTITION_ID + 1 - - /** - * Helper method for determining whether a shuffle should use the optimized unsafe shuffle - * path or whether it should fall back to the original sort-based shuffle. - */ - def canUseUnsafeShuffle[K, V, C](dependency: ShuffleDependency[K, V, C]): Boolean = { - val shufId = dependency.shuffleId - val serializer = Serializer.getSerializer(dependency.serializer) - if (!serializer.supportsRelocationOfSerializedObjects) { - log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because the serializer, " + - s"${serializer.getClass.getName}, does not support object relocation") - false - } else if (dependency.aggregator.isDefined) { - log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because an aggregator is defined") - false - } else if (dependency.partitioner.numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS) { - log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has more than " + - s"$MAX_SHUFFLE_OUTPUT_PARTITIONS partitions") - false - } else { - log.debug(s"Can use UnsafeShuffle for shuffle $shufId") - true - } - } -} - -/** - * A shuffle implementation that uses directly-managed memory to implement several performance - * optimizations for certain types of shuffles. In cases where the new performance optimizations - * cannot be applied, this shuffle manager delegates to [[SortShuffleManager]] to handle those - * shuffles. - * - * UnsafeShuffleManager's optimizations will apply when _all_ of the following conditions hold: - * - * - The shuffle dependency specifies no aggregation or output ordering. - * - The shuffle serializer supports relocation of serialized values (this is currently supported - * by KryoSerializer and Spark SQL's custom serializers). - * - The shuffle produces fewer than 16777216 output partitions. - * - No individual record is larger than 128 MB when serialized. - * - * In addition, extra spill-merging optimizations are automatically applied when the shuffle - * compression codec supports concatenation of serialized streams. This is currently supported by - * Spark's LZF serializer. - * - * At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager. - * In sort-based shuffle, incoming records are sorted according to their target partition ids, then - * written to a single map output file. Reducers fetch contiguous regions of this file in order to - * read their portion of the map output. In cases where the map output data is too large to fit in - * memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged - * to produce the final output file. - * - * UnsafeShuffleManager optimizes this process in several ways: - * - * - Its sort operates on serialized binary data rather than Java objects, which reduces memory - * consumption and GC overheads. This optimization requires the record serializer to have certain - * properties to allow serialized records to be re-ordered without requiring deserialization. - * See SPARK-4550, where this optimization was first proposed and implemented, for more details. - * - * - It uses a specialized cache-efficient sorter ([[UnsafeShuffleExternalSorter]]) that sorts - * arrays of compressed record pointers and partition ids. By using only 8 bytes of space per - * record in the sorting array, this fits more of the array into cache. - * - * - The spill merging procedure operates on blocks of serialized records that belong to the same - * partition and does not need to deserialize records during the merge. - * - * - When the spill compression codec supports concatenation of compressed data, the spill merge - * simply concatenates the serialized and compressed spill partitions to produce the final output - * partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used - * and avoids the need to allocate decompression or copying buffers during the merge. - * - * For more details on UnsafeShuffleManager's design, see SPARK-7081. - */ -private[spark] class UnsafeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { - - if (!conf.getBoolean("spark.shuffle.spill", true)) { - logWarning( - "spark.shuffle.spill was set to false, but this is ignored by the tungsten-sort shuffle " + - "manager; its optimized shuffles will continue to spill to disk when necessary.") - } - - private[this] val sortShuffleManager: SortShuffleManager = new SortShuffleManager(conf) - private[this] val shufflesThatFellBackToSortShuffle = - Collections.newSetFromMap(new ConcurrentHashMap[Int, java.lang.Boolean]()) - private[this] val numMapsForShufflesThatUsedNewPath = new ConcurrentHashMap[Int, Int]() - - /** - * Register a shuffle with the manager and obtain a handle for it to pass to tasks. - */ - override def registerShuffle[K, V, C]( - shuffleId: Int, - numMaps: Int, - dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { - if (UnsafeShuffleManager.canUseUnsafeShuffle(dependency)) { - new UnsafeShuffleHandle[K, V]( - shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) - } else { - new BaseShuffleHandle(shuffleId, numMaps, dependency) - } - } - - /** - * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). - * Called on executors by reduce tasks. - */ - override def getReader[K, C]( - handle: ShuffleHandle, - startPartition: Int, - endPartition: Int, - context: TaskContext): ShuffleReader[K, C] = { - sortShuffleManager.getReader(handle, startPartition, endPartition, context) - } - - /** Get a writer for a given partition. Called on executors by map tasks. */ - override def getWriter[K, V]( - handle: ShuffleHandle, - mapId: Int, - context: TaskContext): ShuffleWriter[K, V] = { - handle match { - case unsafeShuffleHandle: UnsafeShuffleHandle[K @unchecked, V @unchecked] => - numMapsForShufflesThatUsedNewPath.putIfAbsent(handle.shuffleId, unsafeShuffleHandle.numMaps) - val env = SparkEnv.get - new UnsafeShuffleWriter( - env.blockManager, - shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], - context.taskMemoryManager(), - env.shuffleMemoryManager, - unsafeShuffleHandle, - mapId, - context, - env.conf) - case other => - shufflesThatFellBackToSortShuffle.add(handle.shuffleId) - sortShuffleManager.getWriter(handle, mapId, context) - } - } - - /** Remove a shuffle's metadata from the ShuffleManager. */ - override def unregisterShuffle(shuffleId: Int): Boolean = { - if (shufflesThatFellBackToSortShuffle.remove(shuffleId)) { - sortShuffleManager.unregisterShuffle(shuffleId) - } else { - Option(numMapsForShufflesThatUsedNewPath.remove(shuffleId)).foreach { numMaps => - (0 until numMaps).foreach { mapId => - shuffleBlockResolver.removeDataByMap(shuffleId, mapId) - } - } - true - } - } - - override val shuffleBlockResolver: IndexShuffleBlockResolver = { - sortShuffleManager.shuffleBlockResolver - } - - /** Shut down this ShuffleManager. */ - override def stop(): Unit = { - sortShuffleManager.stop() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/scala/org/apache/spark/util/collection/ChainedBuffer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ChainedBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/ChainedBuffer.scala deleted file mode 100644 index ae60f3b..0000000 --- a/core/src/main/scala/org/apache/spark/util/collection/ChainedBuffer.scala +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util.collection - -import java.io.OutputStream - -import scala.collection.mutable.ArrayBuffer - -/** - * A logical byte buffer that wraps a list of byte arrays. All the byte arrays have equal size. The - * advantage of this over a standard ArrayBuffer is that it can grow without claiming large amounts - * of memory and needing to copy the full contents. The disadvantage is that the contents don't - * occupy a contiguous segment of memory. - */ -private[spark] class ChainedBuffer(chunkSize: Int) { - - private val chunkSizeLog2: Int = java.lang.Long.numberOfTrailingZeros( - java.lang.Long.highestOneBit(chunkSize)) - assert((1 << chunkSizeLog2) == chunkSize, - s"ChainedBuffer chunk size $chunkSize must be a power of two") - private val chunks: ArrayBuffer[Array[Byte]] = new ArrayBuffer[Array[Byte]]() - private var _size: Long = 0 - - /** - * Feed bytes from this buffer into a DiskBlockObjectWriter. - * - * @param pos Offset in the buffer to read from. - * @param os OutputStream to read into. - * @param len Number of bytes to read. - */ - def read(pos: Long, os: OutputStream, len: Int): Unit = { - if (pos + len > _size) { - throw new IndexOutOfBoundsException( - s"Read of $len bytes at position $pos would go past size ${_size} of buffer") - } - var chunkIndex: Int = (pos >> chunkSizeLog2).toInt - var posInChunk: Int = (pos - (chunkIndex.toLong << chunkSizeLog2)).toInt - var written: Int = 0 - while (written < len) { - val toRead: Int = math.min(len - written, chunkSize - posInChunk) - os.write(chunks(chunkIndex), posInChunk, toRead) - written += toRead - chunkIndex += 1 - posInChunk = 0 - } - } - - /** - * Read bytes from this buffer into a byte array. - * - * @param pos Offset in the buffer to read from. - * @param bytes Byte array to read into. - * @param offs Offset in the byte array to read to. - * @param len Number of bytes to read. - */ - def read(pos: Long, bytes: Array[Byte], offs: Int, len: Int): Unit = { - if (pos + len > _size) { - throw new IndexOutOfBoundsException( - s"Read of $len bytes at position $pos would go past size of buffer") - } - var chunkIndex: Int = (pos >> chunkSizeLog2).toInt - var posInChunk: Int = (pos - (chunkIndex.toLong << chunkSizeLog2)).toInt - var written: Int = 0 - while (written < len) { - val toRead: Int = math.min(len - written, chunkSize - posInChunk) - System.arraycopy(chunks(chunkIndex), posInChunk, bytes, offs + written, toRead) - written += toRead - chunkIndex += 1 - posInChunk = 0 - } - } - - /** - * Write bytes from a byte array into this buffer. - * - * @param pos Offset in the buffer to write to. - * @param bytes Byte array to write from. - * @param offs Offset in the byte array to write from. - * @param len Number of bytes to write. - */ - def write(pos: Long, bytes: Array[Byte], offs: Int, len: Int): Unit = { - if (pos > _size) { - throw new IndexOutOfBoundsException( - s"Write at position $pos starts after end of buffer ${_size}") - } - // Grow if needed - val endChunkIndex: Int = ((pos + len - 1) >> chunkSizeLog2).toInt - while (endChunkIndex >= chunks.length) { - chunks += new Array[Byte](chunkSize) - } - - var chunkIndex: Int = (pos >> chunkSizeLog2).toInt - var posInChunk: Int = (pos - (chunkIndex.toLong << chunkSizeLog2)).toInt - var written: Int = 0 - while (written < len) { - val toWrite: Int = math.min(len - written, chunkSize - posInChunk) - System.arraycopy(bytes, offs + written, chunks(chunkIndex), posInChunk, toWrite) - written += toWrite - chunkIndex += 1 - posInChunk = 0 - } - - _size = math.max(_size, pos + len) - } - - /** - * Total size of buffer that can be written to without allocating additional memory. - */ - def capacity: Long = chunks.size.toLong * chunkSize - - /** - * Size of the logical buffer. - */ - def size: Long = _size -} - -/** - * Output stream that writes to a ChainedBuffer. - */ -private[spark] class ChainedBufferOutputStream(chainedBuffer: ChainedBuffer) extends OutputStream { - private var pos: Long = 0 - - override def write(b: Int): Unit = { - throw new UnsupportedOperationException() - } - - override def write(bytes: Array[Byte], offs: Int, len: Int): Unit = { - chainedBuffer.write(pos, bytes, offs, len) - pos += len - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 749be34..c48c453 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -29,7 +29,6 @@ import com.google.common.io.ByteStreams import org.apache.spark._ import org.apache.spark.serializer._ import org.apache.spark.executor.ShuffleWriteMetrics -import org.apache.spark.shuffle.sort.{SortShuffleFileWriter, SortShuffleWriter} import org.apache.spark.storage.{BlockId, DiskBlockObjectWriter} /** @@ -69,8 +68,8 @@ import org.apache.spark.storage.{BlockId, DiskBlockObjectWriter} * At a high level, this class works internally as follows: * * - We repeatedly fill up buffers of in-memory data, using either a PartitionedAppendOnlyMap if - * we want to combine by key, or a PartitionedSerializedPairBuffer or PartitionedPairBuffer if we - * don't. Inside these buffers, we sort elements by partition ID and then possibly also by key. + * we want to combine by key, or a PartitionedPairBuffer if we don't. + * Inside these buffers, we sort elements by partition ID and then possibly also by key. * To avoid calling the partitioner multiple times with each key, we store the partition ID * alongside each record. * @@ -93,8 +92,7 @@ private[spark] class ExternalSorter[K, V, C]( ordering: Option[Ordering[K]] = None, serializer: Option[Serializer] = None) extends Logging - with Spillable[WritablePartitionedPairCollection[K, C]] - with SortShuffleFileWriter[K, V] { + with Spillable[WritablePartitionedPairCollection[K, C]] { private val conf = SparkEnv.get.conf @@ -104,13 +102,6 @@ private[spark] class ExternalSorter[K, V, C]( if (shouldPartition) partitioner.get.getPartition(key) else 0 } - // Since SPARK-7855, bypassMergeSort optimization is no longer performed as part of this class. - // As a sanity check, make sure that we're not handling a shuffle which should use that path. - if (SortShuffleWriter.shouldBypassMergeSort(conf, numPartitions, aggregator, ordering)) { - throw new IllegalArgumentException("ExternalSorter should not be used to handle " - + " a sort that the BypassMergeSortShuffleWriter should handle") - } - private val blockManager = SparkEnv.get.blockManager private val diskBlockManager = blockManager.diskBlockManager private val ser = Serializer.getSerializer(serializer) @@ -128,23 +119,11 @@ private[spark] class ExternalSorter[K, V, C]( // grow internal data structures by growing + copying every time the number of objects doubles. private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000) - private val useSerializedPairBuffer = - ordering.isEmpty && - conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) && - ser.supportsRelocationOfSerializedObjects - private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB - private def newBuffer(): WritablePartitionedPairCollection[K, C] with SizeTracker = { - if (useSerializedPairBuffer) { - new PartitionedSerializedPairBuffer(metaInitialRecords = 256, kvChunkSize, serInstance) - } else { - new PartitionedPairBuffer[K, C] - } - } // Data structures to store in-memory objects before we spill. Depending on whether we have an // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we // store them in an array buffer. private var map = new PartitionedAppendOnlyMap[K, C] - private var buffer = newBuffer() + private var buffer = new PartitionedPairBuffer[K, C] // Total spilling statistics private var _diskBytesSpilled = 0L @@ -192,7 +171,7 @@ private[spark] class ExternalSorter[K, V, C]( */ private[spark] def numSpills: Int = spills.size - override def insertAll(records: Iterator[Product2[K, V]]): Unit = { + def insertAll(records: Iterator[Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined @@ -236,7 +215,7 @@ private[spark] class ExternalSorter[K, V, C]( } else { estimatedSize = buffer.estimateSize() if (maybeSpill(buffer, estimatedSize)) { - buffer = newBuffer() + buffer = new PartitionedPairBuffer[K, C] } } @@ -659,7 +638,7 @@ private[spark] class ExternalSorter[K, V, C]( * @param context a TaskContext for a running Spark task, for us to update shuffle metrics. * @return array of lengths, in bytes, of each partition of the file (used by map output tracker) */ - override def writePartitionedFile( + def writePartitionedFile( blockId: BlockId, context: TaskContext, outputFile: File): Array[Long] = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org