http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
 
b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
deleted file mode 100644
index e73ba39..0000000
--- 
a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
+++ /dev/null
@@ -1,479 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.unsafe;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.util.LinkedList;
-
-import scala.Tuple2;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.TaskContext;
-import org.apache.spark.executor.ShuffleWriteMetrics;
-import org.apache.spark.serializer.DummySerializerInstance;
-import org.apache.spark.serializer.SerializerInstance;
-import org.apache.spark.shuffle.ShuffleMemoryManager;
-import org.apache.spark.storage.BlockManager;
-import org.apache.spark.storage.DiskBlockObjectWriter;
-import org.apache.spark.storage.TempShuffleBlockId;
-import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.array.ByteArrayMethods;
-import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.unsafe.memory.TaskMemoryManager;
-import org.apache.spark.util.Utils;
-
-/**
- * An external sorter that is specialized for sort-based shuffle.
- * <p>
- * Incoming records are appended to data pages. When all records have been 
inserted (or when the
- * current thread's shuffle memory limit is reached), the in-memory records 
are sorted according to
- * their partition ids (using a {@link UnsafeShuffleInMemorySorter}). The 
sorted records are then
- * written to a single output file (or multiple files, if we've spilled). The 
format of the output
- * files is the same as the format of the final output file written by
- * {@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output 
partition's records are
- * written as a single serialized, compressed stream that can be read with a 
new decompression and
- * deserialization stream.
- * <p>
- * Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this sorter 
does not merge its
- * spill files. Instead, this merging is performed in {@link 
UnsafeShuffleWriter}, which uses a
- * specialized merge procedure that avoids extra serialization/deserialization.
- */
-final class UnsafeShuffleExternalSorter {
-
-  private final Logger logger = 
LoggerFactory.getLogger(UnsafeShuffleExternalSorter.class);
-
-  @VisibleForTesting
-  static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
-
-  private final int initialSize;
-  private final int numPartitions;
-  private final int pageSizeBytes;
-  @VisibleForTesting
-  final int maxRecordSizeBytes;
-  private final TaskMemoryManager taskMemoryManager;
-  private final ShuffleMemoryManager shuffleMemoryManager;
-  private final BlockManager blockManager;
-  private final TaskContext taskContext;
-  private final ShuffleWriteMetrics writeMetrics;
-
-  /** The buffer size to use when writing spills using DiskBlockObjectWriter */
-  private final int fileBufferSizeBytes;
-
-  /**
-   * Memory pages that hold the records being sorted. The pages in this list 
are freed when
-   * spilling, although in principle we could recycle these pages across 
spills (on the other hand,
-   * this might not be necessary if we maintained a pool of re-usable pages in 
the TaskMemoryManager
-   * itself).
-   */
-  private final LinkedList<MemoryBlock> allocatedPages = new 
LinkedList<MemoryBlock>();
-
-  private final LinkedList<SpillInfo> spills = new LinkedList<SpillInfo>();
-
-  /** Peak memory used by this sorter so far, in bytes. **/
-  private long peakMemoryUsedBytes;
-
-  // These variables are reset after spilling:
-  @Nullable private UnsafeShuffleInMemorySorter inMemSorter;
-  @Nullable private MemoryBlock currentPage = null;
-  private long currentPagePosition = -1;
-  private long freeSpaceInCurrentPage = 0;
-
-  public UnsafeShuffleExternalSorter(
-      TaskMemoryManager memoryManager,
-      ShuffleMemoryManager shuffleMemoryManager,
-      BlockManager blockManager,
-      TaskContext taskContext,
-      int initialSize,
-      int numPartitions,
-      SparkConf conf,
-      ShuffleWriteMetrics writeMetrics) throws IOException {
-    this.taskMemoryManager = memoryManager;
-    this.shuffleMemoryManager = shuffleMemoryManager;
-    this.blockManager = blockManager;
-    this.taskContext = taskContext;
-    this.initialSize = initialSize;
-    this.peakMemoryUsedBytes = initialSize;
-    this.numPartitions = numPartitions;
-    // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no 
units are provided
-    this.fileBufferSizeBytes = (int) 
conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
-    this.pageSizeBytes = (int) Math.min(
-      PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, 
shuffleMemoryManager.pageSizeBytes());
-    this.maxRecordSizeBytes = pageSizeBytes - 4;
-    this.writeMetrics = writeMetrics;
-    initializeForWriting();
-
-    // preserve first page to ensure that we have at least one page to work 
with. Otherwise,
-    // other operators in the same task may starve this sorter (SPARK-9709).
-    acquireNewPageIfNecessary(pageSizeBytes);
-  }
-
-  /**
-   * Allocates new sort data structures. Called when creating the sorter and 
after each spill.
-   */
-  private void initializeForWriting() throws IOException {
-    // TODO: move this sizing calculation logic into a static method of sorter:
-    final long memoryRequested = initialSize * 8L;
-    final long memoryAcquired = 
shuffleMemoryManager.tryToAcquire(memoryRequested);
-    if (memoryAcquired != memoryRequested) {
-      shuffleMemoryManager.release(memoryAcquired);
-      throw new IOException("Could not acquire " + memoryRequested + " bytes 
of memory");
-    }
-
-    this.inMemSorter = new UnsafeShuffleInMemorySorter(initialSize);
-  }
-
-  /**
-   * Sorts the in-memory records and writes the sorted records to an on-disk 
file.
-   * This method does not free the sort data structures.
-   *
-   * @param isLastFile if true, this indicates that we're writing the final 
output file and that the
-   *                   bytes written should be counted towards shuffle spill 
metrics rather than
-   *                   shuffle write metrics.
-   */
-  private void writeSortedFile(boolean isLastFile) throws IOException {
-
-    final ShuffleWriteMetrics writeMetricsToUse;
-
-    if (isLastFile) {
-      // We're writing the final non-spill file, so we _do_ want to count this 
as shuffle bytes.
-      writeMetricsToUse = writeMetrics;
-    } else {
-      // We're spilling, so bytes written should be counted towards spill 
rather than write.
-      // Create a dummy WriteMetrics object to absorb these metrics, since we 
don't want to count
-      // them towards shuffle bytes written.
-      writeMetricsToUse = new ShuffleWriteMetrics();
-    }
-
-    // This call performs the actual sort.
-    final UnsafeShuffleInMemorySorter.UnsafeShuffleSorterIterator 
sortedRecords =
-      inMemSorter.getSortedIterator();
-
-    // Currently, we need to open a new DiskBlockObjectWriter for each 
partition; we can avoid this
-    // after SPARK-5581 is fixed.
-    DiskBlockObjectWriter writer;
-
-    // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since 
there doesn't seem to
-    // be an API to directly transfer bytes from managed memory to the disk 
writer, we buffer
-    // data through a byte array. This array does not need to be large enough 
to hold a single
-    // record;
-    final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE];
-
-    // Because this output will be read during shuffle, its compression codec 
must be controlled by
-    // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we 
need to use
-    // createTempShuffleBlock here; see SPARK-3426 for more details.
-    final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
-      blockManager.diskBlockManager().createTempShuffleBlock();
-    final File file = spilledFileInfo._2();
-    final TempShuffleBlockId blockId = spilledFileInfo._1();
-    final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);
-
-    // Unfortunately, we need a serializer instance in order to construct a 
DiskBlockObjectWriter.
-    // Our write path doesn't actually use this serializer (since we end up 
calling the `write()`
-    // OutputStream methods), but DiskBlockObjectWriter still calls some 
methods on it. To work
-    // around this, we pass a dummy no-op serializer.
-    final SerializerInstance ser = DummySerializerInstance.INSTANCE;
-
-    writer = blockManager.getDiskWriter(blockId, file, ser, 
fileBufferSizeBytes, writeMetricsToUse);
-
-    int currentPartition = -1;
-    while (sortedRecords.hasNext()) {
-      sortedRecords.loadNext();
-      final int partition = sortedRecords.packedRecordPointer.getPartitionId();
-      assert (partition >= currentPartition);
-      if (partition != currentPartition) {
-        // Switch to the new partition
-        if (currentPartition != -1) {
-          writer.commitAndClose();
-          spillInfo.partitionLengths[currentPartition] = 
writer.fileSegment().length();
-        }
-        currentPartition = partition;
-        writer =
-          blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, 
writeMetricsToUse);
-      }
-
-      final long recordPointer = 
sortedRecords.packedRecordPointer.getRecordPointer();
-      final Object recordPage = taskMemoryManager.getPage(recordPointer);
-      final long recordOffsetInPage = 
taskMemoryManager.getOffsetInPage(recordPointer);
-      int dataRemaining = Platform.getInt(recordPage, recordOffsetInPage);
-      long recordReadPosition = recordOffsetInPage + 4; // skip over record 
length
-      while (dataRemaining > 0) {
-        final int toTransfer = Math.min(DISK_WRITE_BUFFER_SIZE, dataRemaining);
-        Platform.copyMemory(
-          recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
-        writer.write(writeBuffer, 0, toTransfer);
-        recordReadPosition += toTransfer;
-        dataRemaining -= toTransfer;
-      }
-      writer.recordWritten();
-    }
-
-    if (writer != null) {
-      writer.commitAndClose();
-      // If `writeSortedFile()` was called from `closeAndGetSpills()` and no 
records were inserted,
-      // then the file might be empty. Note that it might be better to avoid 
calling
-      // writeSortedFile() in that case.
-      if (currentPartition != -1) {
-        spillInfo.partitionLengths[currentPartition] = 
writer.fileSegment().length();
-        spills.add(spillInfo);
-      }
-    }
-
-    if (!isLastFile) {  // i.e. this is a spill file
-      // The current semantics of `shuffleRecordsWritten` seem to be that it's 
updated when records
-      // are written to disk, not when they enter the shuffle sorting code. 
DiskBlockObjectWriter
-      // relies on its `recordWritten()` method being called in order to 
trigger periodic updates to
-      // `shuffleBytesWritten`. If we were to remove the `recordWritten()` 
call and increment that
-      // counter at a higher-level, then the in-progress metrics for records 
written and bytes
-      // written would get out of sync.
-      //
-      // When writing the last file, we pass `writeMetrics` directly to the 
DiskBlockObjectWriter;
-      // in all other cases, we pass in a dummy write metrics to capture 
metrics, then copy those
-      // metrics to the true write metrics here. The reason for performing 
this copying is so that
-      // we can avoid reporting spilled bytes as shuffle write bytes.
-      //
-      // Note that we intentionally ignore the value of 
`writeMetricsToUse.shuffleWriteTime()`.
-      // Consistent with ExternalSorter, we do not count this IO towards 
shuffle write time.
-      // This means that this IO time is not accounted for anywhere; 
SPARK-3577 will fix this.
-      
writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
-      
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
-    }
-  }
-
-  /**
-   * Sort and spill the current records in response to memory pressure.
-   */
-  @VisibleForTesting
-  void spill() throws IOException {
-    logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
-      Thread.currentThread().getId(),
-      Utils.bytesToString(getMemoryUsage()),
-      spills.size(),
-      spills.size() > 1 ? " times" : " time");
-
-    writeSortedFile(false);
-    final long inMemSorterMemoryUsage = inMemSorter.getMemoryUsage();
-    inMemSorter = null;
-    shuffleMemoryManager.release(inMemSorterMemoryUsage);
-    final long spillSize = freeMemory();
-    taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
-
-    initializeForWriting();
-  }
-
-  private long getMemoryUsage() {
-    long totalPageSize = 0;
-    for (MemoryBlock page : allocatedPages) {
-      totalPageSize += page.size();
-    }
-    return ((inMemSorter == null) ? 0 : inMemSorter.getMemoryUsage()) + 
totalPageSize;
-  }
-
-  private void updatePeakMemoryUsed() {
-    long mem = getMemoryUsage();
-    if (mem > peakMemoryUsedBytes) {
-      peakMemoryUsedBytes = mem;
-    }
-  }
-
-  /**
-   * Return the peak memory used so far, in bytes.
-   */
-  long getPeakMemoryUsedBytes() {
-    updatePeakMemoryUsed();
-    return peakMemoryUsedBytes;
-  }
-
-  private long freeMemory() {
-    updatePeakMemoryUsed();
-    long memoryFreed = 0;
-    for (MemoryBlock block : allocatedPages) {
-      taskMemoryManager.freePage(block);
-      shuffleMemoryManager.release(block.size());
-      memoryFreed += block.size();
-    }
-    allocatedPages.clear();
-    currentPage = null;
-    currentPagePosition = -1;
-    freeSpaceInCurrentPage = 0;
-    return memoryFreed;
-  }
-
-  /**
-   * Force all memory and spill files to be deleted; called by shuffle 
error-handling code.
-   */
-  public void cleanupResources() {
-    freeMemory();
-    for (SpillInfo spill : spills) {
-      if (spill.file.exists() && !spill.file.delete()) {
-        logger.error("Unable to delete spill file {}", spill.file.getPath());
-      }
-    }
-    if (inMemSorter != null) {
-      shuffleMemoryManager.release(inMemSorter.getMemoryUsage());
-      inMemSorter = null;
-    }
-  }
-
-  /**
-   * Checks whether there is enough space to insert an additional record in to 
the sort pointer
-   * array and grows the array if additional space is required. If the 
required space cannot be
-   * obtained, then the in-memory data will be spilled to disk.
-   */
-  private void growPointerArrayIfNecessary() throws IOException {
-    assert(inMemSorter != null);
-    if (!inMemSorter.hasSpaceForAnotherRecord()) {
-      logger.debug("Attempting to expand sort pointer array");
-      final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage();
-      final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2;
-      final long memoryAcquired = 
shuffleMemoryManager.tryToAcquire(memoryToGrowPointerArray);
-      if (memoryAcquired < memoryToGrowPointerArray) {
-        shuffleMemoryManager.release(memoryAcquired);
-        spill();
-      } else {
-        inMemSorter.expandPointerArray();
-        shuffleMemoryManager.release(oldPointerArrayMemoryUsage);
-      }
-    }
-  }
-  
-  /**
-   * Allocates more memory in order to insert an additional record. This will 
request additional
-   * memory from the {@link ShuffleMemoryManager} and spill if the requested 
memory can not be
-   * obtained.
-   *
-   * @param requiredSpace the required space in the data page, in bytes, 
including space for storing
-   *                      the record size. This must be less than or equal to 
the page size (records
-   *                      that exceed the page size are handled via a 
different code path which uses
-   *                      special overflow pages).
-   */
-  private void acquireNewPageIfNecessary(int requiredSpace) throws IOException 
{
-    growPointerArrayIfNecessary();
-    if (requiredSpace > freeSpaceInCurrentPage) {
-      logger.trace("Required space {} is less than free space in current page 
({})", requiredSpace,
-        freeSpaceInCurrentPage);
-      // TODO: we should track metrics on the amount of space wasted when we 
roll over to a new page
-      // without using the free space at the end of the current page. We 
should also do this for
-      // BytesToBytesMap.
-      if (requiredSpace > pageSizeBytes) {
-        throw new IOException("Required space " + requiredSpace + " is greater 
than page size (" +
-          pageSizeBytes + ")");
-      } else {
-        final long memoryAcquired = 
shuffleMemoryManager.tryToAcquire(pageSizeBytes);
-        if (memoryAcquired < pageSizeBytes) {
-          shuffleMemoryManager.release(memoryAcquired);
-          spill();
-          final long memoryAcquiredAfterSpilling = 
shuffleMemoryManager.tryToAcquire(pageSizeBytes);
-          if (memoryAcquiredAfterSpilling != pageSizeBytes) {
-            shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
-            throw new IOException("Unable to acquire " + pageSizeBytes + " 
bytes of memory");
-          }
-        }
-        currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
-        currentPagePosition = currentPage.getBaseOffset();
-        freeSpaceInCurrentPage = pageSizeBytes;
-        allocatedPages.add(currentPage);
-      }
-    }
-  }
-
-  /**
-   * Write a record to the shuffle sorter.
-   */
-  public void insertRecord(
-      Object recordBaseObject,
-      long recordBaseOffset,
-      int lengthInBytes,
-      int partitionId) throws IOException {
-
-    growPointerArrayIfNecessary();
-    // Need 4 bytes to store the record length.
-    final int totalSpaceRequired = lengthInBytes + 4;
-
-    // --- Figure out where to insert the new record 
----------------------------------------------
-
-    final MemoryBlock dataPage;
-    long dataPagePosition;
-    boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
-    if (useOverflowPage) {
-      long overflowPageSize = 
ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
-      // The record is larger than the page size, so allocate a special 
overflow page just to hold
-      // that record.
-      final long memoryGranted = 
shuffleMemoryManager.tryToAcquire(overflowPageSize);
-      if (memoryGranted != overflowPageSize) {
-        shuffleMemoryManager.release(memoryGranted);
-        spill();
-        final long memoryGrantedAfterSpill = 
shuffleMemoryManager.tryToAcquire(overflowPageSize);
-        if (memoryGrantedAfterSpill != overflowPageSize) {
-          shuffleMemoryManager.release(memoryGrantedAfterSpill);
-          throw new IOException("Unable to acquire " + overflowPageSize + " 
bytes of memory");
-        }
-      }
-      MemoryBlock overflowPage = 
taskMemoryManager.allocatePage(overflowPageSize);
-      allocatedPages.add(overflowPage);
-      dataPage = overflowPage;
-      dataPagePosition = overflowPage.getBaseOffset();
-    } else {
-      // The record is small enough to fit in a regular data page, but the 
current page might not
-      // have enough space to hold it (or no pages have been allocated yet).
-      acquireNewPageIfNecessary(totalSpaceRequired);
-      dataPage = currentPage;
-      dataPagePosition = currentPagePosition;
-      // Update bookkeeping information
-      freeSpaceInCurrentPage -= totalSpaceRequired;
-      currentPagePosition += totalSpaceRequired;
-    }
-    final Object dataPageBaseObject = dataPage.getBaseObject();
-
-    final long recordAddress =
-      taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition);
-    Platform.putInt(dataPageBaseObject, dataPagePosition, lengthInBytes);
-    dataPagePosition += 4;
-    Platform.copyMemory(
-      recordBaseObject, recordBaseOffset, dataPageBaseObject, 
dataPagePosition, lengthInBytes);
-    assert(inMemSorter != null);
-    inMemSorter.insertRecord(recordAddress, partitionId);
-  }
-
-  /**
-   * Close the sorter, causing any buffered data to be sorted and written out 
to disk.
-   *
-   * @return metadata for the spill files written by this sorter. If no 
records were ever inserted
-   *         into this sorter, then this will return an empty array.
-   * @throws IOException
-   */
-  public SpillInfo[] closeAndGetSpills() throws IOException {
-    try {
-      if (inMemSorter != null) {
-        // Do not count the final file towards the spill count.
-        writeSortedFile(true);
-        freeMemory();
-      }
-      return spills.toArray(new SpillInfo[spills.size()]);
-    } catch (IOException e) {
-      cleanupResources();
-      throw e;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleInMemorySorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleInMemorySorter.java
 
b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleInMemorySorter.java
deleted file mode 100644
index 5bab501..0000000
--- 
a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleInMemorySorter.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.unsafe;
-
-import java.util.Comparator;
-
-import org.apache.spark.util.collection.Sorter;
-
-final class UnsafeShuffleInMemorySorter {
-
-  private final Sorter<PackedRecordPointer, long[]> sorter;
-  private static final class SortComparator implements 
Comparator<PackedRecordPointer> {
-    @Override
-    public int compare(PackedRecordPointer left, PackedRecordPointer right) {
-      return left.getPartitionId() - right.getPartitionId();
-    }
-  }
-  private static final SortComparator SORT_COMPARATOR = new SortComparator();
-
-  /**
-   * An array of record pointers and partition ids that have been encoded by
-   * {@link PackedRecordPointer}. The sort operates on this array instead of 
directly manipulating
-   * records.
-   */
-  private long[] pointerArray;
-
-  /**
-   * The position in the pointer array where new records can be inserted.
-   */
-  private int pointerArrayInsertPosition = 0;
-
-  public UnsafeShuffleInMemorySorter(int initialSize) {
-    assert (initialSize > 0);
-    this.pointerArray = new long[initialSize];
-    this.sorter = new Sorter<PackedRecordPointer, 
long[]>(UnsafeShuffleSortDataFormat.INSTANCE);
-  }
-
-  public void expandPointerArray() {
-    final long[] oldArray = pointerArray;
-    // Guard against overflow:
-    final int newLength = oldArray.length * 2 > 0 ? (oldArray.length * 2) : 
Integer.MAX_VALUE;
-    pointerArray = new long[newLength];
-    System.arraycopy(oldArray, 0, pointerArray, 0, oldArray.length);
-  }
-
-  public boolean hasSpaceForAnotherRecord() {
-    return pointerArrayInsertPosition + 1 < pointerArray.length;
-  }
-
-  public long getMemoryUsage() {
-    return pointerArray.length * 8L;
-  }
-
-  /**
-   * Inserts a record to be sorted.
-   *
-   * @param recordPointer a pointer to the record, encoded by the task memory 
manager. Due to
-   *                      certain pointer compression techniques used by the 
sorter, the sort can
-   *                      only operate on pointers that point to locations in 
the first
-   *                      {@link PackedRecordPointer#MAXIMUM_PAGE_SIZE_BYTES} 
bytes of a data page.
-   * @param partitionId the partition id, which must be less than or equal to
-   *                    {@link PackedRecordPointer#MAXIMUM_PARTITION_ID}.
-   */
-  public void insertRecord(long recordPointer, int partitionId) {
-    if (!hasSpaceForAnotherRecord()) {
-      if (pointerArray.length == Integer.MAX_VALUE) {
-        throw new IllegalStateException("Sort pointer array has reached 
maximum size");
-      } else {
-        expandPointerArray();
-      }
-    }
-    pointerArray[pointerArrayInsertPosition] =
-        PackedRecordPointer.packPointer(recordPointer, partitionId);
-    pointerArrayInsertPosition++;
-  }
-
-  /**
-   * An iterator-like class that's used instead of Java's Iterator in order to 
facilitate inlining.
-   */
-  public static final class UnsafeShuffleSorterIterator {
-
-    private final long[] pointerArray;
-    private final int numRecords;
-    final PackedRecordPointer packedRecordPointer = new PackedRecordPointer();
-    private int position = 0;
-
-    public UnsafeShuffleSorterIterator(int numRecords, long[] pointerArray) {
-      this.numRecords = numRecords;
-      this.pointerArray = pointerArray;
-    }
-
-    public boolean hasNext() {
-      return position < numRecords;
-    }
-
-    public void loadNext() {
-      packedRecordPointer.set(pointerArray[position]);
-      position++;
-    }
-  }
-
-  /**
-   * Return an iterator over record pointers in sorted order.
-   */
-  public UnsafeShuffleSorterIterator getSortedIterator() {
-    sorter.sort(pointerArray, 0, pointerArrayInsertPosition, SORT_COMPARATOR);
-    return new UnsafeShuffleSorterIterator(pointerArrayInsertPosition, 
pointerArray);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java
 
b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java
deleted file mode 100644
index a66d74e..0000000
--- 
a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.unsafe;
-
-import org.apache.spark.util.collection.SortDataFormat;
-
-final class UnsafeShuffleSortDataFormat extends 
SortDataFormat<PackedRecordPointer, long[]> {
-
-  public static final UnsafeShuffleSortDataFormat INSTANCE = new 
UnsafeShuffleSortDataFormat();
-
-  private UnsafeShuffleSortDataFormat() { }
-
-  @Override
-  public PackedRecordPointer getKey(long[] data, int pos) {
-    // Since we re-use keys, this method shouldn't be called.
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public PackedRecordPointer newKey() {
-    return new PackedRecordPointer();
-  }
-
-  @Override
-  public PackedRecordPointer getKey(long[] data, int pos, PackedRecordPointer 
reuse) {
-    reuse.set(data[pos]);
-    return reuse;
-  }
-
-  @Override
-  public void swap(long[] data, int pos0, int pos1) {
-    final long temp = data[pos0];
-    data[pos0] = data[pos1];
-    data[pos1] = temp;
-  }
-
-  @Override
-  public void copyElement(long[] src, int srcPos, long[] dst, int dstPos) {
-    dst[dstPos] = src[srcPos];
-  }
-
-  @Override
-  public void copyRange(long[] src, int srcPos, long[] dst, int dstPos, int 
length) {
-    System.arraycopy(src, srcPos, dst, dstPos, length);
-  }
-
-  @Override
-  public long[] allocate(int length) {
-    return new long[length];
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
deleted file mode 100644
index fdb309e..0000000
--- 
a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
+++ /dev/null
@@ -1,489 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.unsafe;
-
-import javax.annotation.Nullable;
-import java.io.*;
-import java.nio.channels.FileChannel;
-import java.util.Iterator;
-
-import scala.Option;
-import scala.Product2;
-import scala.collection.JavaConverters;
-import scala.collection.immutable.Map;
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Closeables;
-import com.google.common.io.Files;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.*;
-import org.apache.spark.annotation.Private;
-import org.apache.spark.executor.ShuffleWriteMetrics;
-import org.apache.spark.io.CompressionCodec;
-import org.apache.spark.io.CompressionCodec$;
-import org.apache.spark.io.LZFCompressionCodec;
-import org.apache.spark.network.util.LimitedInputStream;
-import org.apache.spark.scheduler.MapStatus;
-import org.apache.spark.scheduler.MapStatus$;
-import org.apache.spark.serializer.SerializationStream;
-import org.apache.spark.serializer.Serializer;
-import org.apache.spark.serializer.SerializerInstance;
-import org.apache.spark.shuffle.IndexShuffleBlockResolver;
-import org.apache.spark.shuffle.ShuffleMemoryManager;
-import org.apache.spark.shuffle.ShuffleWriter;
-import org.apache.spark.storage.BlockManager;
-import org.apache.spark.storage.TimeTrackingOutputStream;
-import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.memory.TaskMemoryManager;
-
-@Private
-public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
-
-  private final Logger logger = 
LoggerFactory.getLogger(UnsafeShuffleWriter.class);
-
-  private static final ClassTag<Object> OBJECT_CLASS_TAG = 
ClassTag$.MODULE$.Object();
-
-  @VisibleForTesting
-  static final int INITIAL_SORT_BUFFER_SIZE = 4096;
-
-  private final BlockManager blockManager;
-  private final IndexShuffleBlockResolver shuffleBlockResolver;
-  private final TaskMemoryManager memoryManager;
-  private final ShuffleMemoryManager shuffleMemoryManager;
-  private final SerializerInstance serializer;
-  private final Partitioner partitioner;
-  private final ShuffleWriteMetrics writeMetrics;
-  private final int shuffleId;
-  private final int mapId;
-  private final TaskContext taskContext;
-  private final SparkConf sparkConf;
-  private final boolean transferToEnabled;
-
-  @Nullable private MapStatus mapStatus;
-  @Nullable private UnsafeShuffleExternalSorter sorter;
-  private long peakMemoryUsedBytes = 0;
-
-  /** Subclass of ByteArrayOutputStream that exposes `buf` directly. */
-  private static final class MyByteArrayOutputStream extends 
ByteArrayOutputStream {
-    public MyByteArrayOutputStream(int size) { super(size); }
-    public byte[] getBuf() { return buf; }
-  }
-
-  private MyByteArrayOutputStream serBuffer;
-  private SerializationStream serOutputStream;
-
-  /**
-   * Are we in the process of stopping? Because map tasks can call stop() with 
success = true
-   * and then call stop() with success = false if they get an exception, we 
want to make sure
-   * we don't try deleting files, etc twice.
-   */
-  private boolean stopping = false;
-
-  public UnsafeShuffleWriter(
-      BlockManager blockManager,
-      IndexShuffleBlockResolver shuffleBlockResolver,
-      TaskMemoryManager memoryManager,
-      ShuffleMemoryManager shuffleMemoryManager,
-      UnsafeShuffleHandle<K, V> handle,
-      int mapId,
-      TaskContext taskContext,
-      SparkConf sparkConf) throws IOException {
-    final int numPartitions = 
handle.dependency().partitioner().numPartitions();
-    if (numPartitions > UnsafeShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS()) {
-      throw new IllegalArgumentException(
-        "UnsafeShuffleWriter can only be used for shuffles with at most " +
-          UnsafeShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS() + " reduce 
partitions");
-    }
-    this.blockManager = blockManager;
-    this.shuffleBlockResolver = shuffleBlockResolver;
-    this.memoryManager = memoryManager;
-    this.shuffleMemoryManager = shuffleMemoryManager;
-    this.mapId = mapId;
-    final ShuffleDependency<K, V, V> dep = handle.dependency();
-    this.shuffleId = dep.shuffleId();
-    this.serializer = Serializer.getSerializer(dep.serializer()).newInstance();
-    this.partitioner = dep.partitioner();
-    this.writeMetrics = new ShuffleWriteMetrics();
-    
taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
-    this.taskContext = taskContext;
-    this.sparkConf = sparkConf;
-    this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", 
true);
-    open();
-  }
-
-  @VisibleForTesting
-  public int maxRecordSizeBytes() {
-    assert(sorter != null);
-    return sorter.maxRecordSizeBytes;
-  }
-
-  private void updatePeakMemoryUsed() {
-    // sorter can be null if this writer is closed
-    if (sorter != null) {
-      long mem = sorter.getPeakMemoryUsedBytes();
-      if (mem > peakMemoryUsedBytes) {
-        peakMemoryUsedBytes = mem;
-      }
-    }
-  }
-
-  /**
-   * Return the peak memory used so far, in bytes.
-   */
-  public long getPeakMemoryUsedBytes() {
-    updatePeakMemoryUsed();
-    return peakMemoryUsedBytes;
-  }
-
-  /**
-   * This convenience method should only be called in test code.
-   */
-  @VisibleForTesting
-  public void write(Iterator<Product2<K, V>> records) throws IOException {
-    write(JavaConverters.asScalaIteratorConverter(records).asScala());
-  }
-
-  @Override
-  public void write(scala.collection.Iterator<Product2<K, V>> records) throws 
IOException {
-    // Keep track of success so we know if we encountered an exception
-    // We do this rather than a standard try/catch/re-throw to handle
-    // generic throwables.
-    boolean success = false;
-    try {
-      while (records.hasNext()) {
-        insertRecordIntoSorter(records.next());
-      }
-      closeAndWriteOutput();
-      success = true;
-    } finally {
-      if (sorter != null) {
-        try {
-          sorter.cleanupResources();
-        } catch (Exception e) {
-          // Only throw this error if we won't be masking another
-          // error.
-          if (success) {
-            throw e;
-          } else {
-            logger.error("In addition to a failure during writing, we failed 
during " +
-                         "cleanup.", e);
-          }
-        }
-      }
-    }
-  }
-
-  private void open() throws IOException {
-    assert (sorter == null);
-    sorter = new UnsafeShuffleExternalSorter(
-      memoryManager,
-      shuffleMemoryManager,
-      blockManager,
-      taskContext,
-      INITIAL_SORT_BUFFER_SIZE,
-      partitioner.numPartitions(),
-      sparkConf,
-      writeMetrics);
-    serBuffer = new MyByteArrayOutputStream(1024 * 1024);
-    serOutputStream = serializer.serializeStream(serBuffer);
-  }
-
-  @VisibleForTesting
-  void closeAndWriteOutput() throws IOException {
-    assert(sorter != null);
-    updatePeakMemoryUsed();
-    serBuffer = null;
-    serOutputStream = null;
-    final SpillInfo[] spills = sorter.closeAndGetSpills();
-    sorter = null;
-    final long[] partitionLengths;
-    try {
-      partitionLengths = mergeSpills(spills);
-    } finally {
-      for (SpillInfo spill : spills) {
-        if (spill.file.exists() && ! spill.file.delete()) {
-          logger.error("Error while deleting spill file {}", 
spill.file.getPath());
-        }
-      }
-    }
-    shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
-    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), 
partitionLengths);
-  }
-
-  @VisibleForTesting
-  void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
-    assert(sorter != null);
-    final K key = record._1();
-    final int partitionId = partitioner.getPartition(key);
-    serBuffer.reset();
-    serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
-    serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
-    serOutputStream.flush();
-
-    final int serializedRecordSize = serBuffer.size();
-    assert (serializedRecordSize > 0);
-
-    sorter.insertRecord(
-      serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, 
partitionId);
-  }
-
-  @VisibleForTesting
-  void forceSorterToSpill() throws IOException {
-    assert (sorter != null);
-    sorter.spill();
-  }
-
-  /**
-   * Merge zero or more spill files together, choosing the fastest merging 
strategy based on the
-   * number of spills and the IO compression codec.
-   *
-   * @return the partition lengths in the merged file.
-   */
-  private long[] mergeSpills(SpillInfo[] spills) throws IOException {
-    final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
-    final boolean compressionEnabled = 
sparkConf.getBoolean("spark.shuffle.compress", true);
-    final CompressionCodec compressionCodec = 
CompressionCodec$.MODULE$.createCodec(sparkConf);
-    final boolean fastMergeEnabled =
-      sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
-    final boolean fastMergeIsSupported =
-      !compressionEnabled || compressionCodec instanceof LZFCompressionCodec;
-    try {
-      if (spills.length == 0) {
-        new FileOutputStream(outputFile).close(); // Create an empty file
-        return new long[partitioner.numPartitions()];
-      } else if (spills.length == 1) {
-        // Here, we don't need to perform any metrics updates because the 
bytes written to this
-        // output file would have already been counted as shuffle bytes 
written.
-        Files.move(spills[0].file, outputFile);
-        return spills[0].partitionLengths;
-      } else {
-        final long[] partitionLengths;
-        // There are multiple spills to merge, so none of these spill files' 
lengths were counted
-        // towards our shuffle write count or shuffle write time. If we use 
the slow merge path,
-        // then the final output file's size won't necessarily be equal to the 
sum of the spill
-        // files' sizes. To guard against this case, we look at the output 
file's actual size when
-        // computing shuffle bytes written.
-        //
-        // We allow the individual merge methods to report their own IO times 
since different merge
-        // strategies use different IO techniques.  We count IO during merge 
towards the shuffle
-        // shuffle write time, which appears to be consistent with the "not 
bypassing merge-sort"
-        // branch in ExternalSorter.
-        if (fastMergeEnabled && fastMergeIsSupported) {
-          // Compression is disabled or we are using an IO compression codec 
that supports
-          // decompression of concatenated compressed streams, so we can 
perform a fast spill merge
-          // that doesn't need to interpret the spilled bytes.
-          if (transferToEnabled) {
-            logger.debug("Using transferTo-based fast merge");
-            partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
-          } else {
-            logger.debug("Using fileStream-based fast merge");
-            partitionLengths = mergeSpillsWithFileStream(spills, outputFile, 
null);
-          }
-        } else {
-          logger.debug("Using slow merge");
-          partitionLengths = mergeSpillsWithFileStream(spills, outputFile, 
compressionCodec);
-        }
-        // When closing an UnsafeShuffleExternalSorter that has already 
spilled once but also has
-        // in-memory records, we write out the in-memory records to a file but 
do not count that
-        // final write as bytes spilled (instead, it's accounted as shuffle 
write). The merge needs
-        // to be counted as shuffle write, but this will lead to 
double-counting of the final
-        // SpillInfo's bytes.
-        writeMetrics.decShuffleBytesWritten(spills[spills.length - 
1].file.length());
-        writeMetrics.incShuffleBytesWritten(outputFile.length());
-        return partitionLengths;
-      }
-    } catch (IOException e) {
-      if (outputFile.exists() && !outputFile.delete()) {
-        logger.error("Unable to delete output file {}", outputFile.getPath());
-      }
-      throw e;
-    }
-  }
-
-  /**
-   * Merges spill files using Java FileStreams. This code path is slower than 
the NIO-based merge,
-   * {@link UnsafeShuffleWriter#mergeSpillsWithTransferTo(SpillInfo[], File)}, 
so it's only used in
-   * cases where the IO compression codec does not support concatenation of 
compressed data, or in
-   * cases where users have explicitly disabled use of {@code transferTo} in 
order to work around
-   * kernel bugs.
-   *
-   * @param spills the spills to merge.
-   * @param outputFile the file to write the merged data to.
-   * @param compressionCodec the IO compression codec, or null if shuffle 
compression is disabled.
-   * @return the partition lengths in the merged file.
-   */
-  private long[] mergeSpillsWithFileStream(
-      SpillInfo[] spills,
-      File outputFile,
-      @Nullable CompressionCodec compressionCodec) throws IOException {
-    assert (spills.length >= 2);
-    final int numPartitions = partitioner.numPartitions();
-    final long[] partitionLengths = new long[numPartitions];
-    final InputStream[] spillInputStreams = new FileInputStream[spills.length];
-    OutputStream mergedFileOutputStream = null;
-
-    boolean threwException = true;
-    try {
-      for (int i = 0; i < spills.length; i++) {
-        spillInputStreams[i] = new FileInputStream(spills[i].file);
-      }
-      for (int partition = 0; partition < numPartitions; partition++) {
-        final long initialFileLength = outputFile.length();
-        mergedFileOutputStream =
-          new TimeTrackingOutputStream(writeMetrics, new 
FileOutputStream(outputFile, true));
-        if (compressionCodec != null) {
-          mergedFileOutputStream = 
compressionCodec.compressedOutputStream(mergedFileOutputStream);
-        }
-
-        for (int i = 0; i < spills.length; i++) {
-          final long partitionLengthInSpill = 
spills[i].partitionLengths[partition];
-          if (partitionLengthInSpill > 0) {
-            InputStream partitionInputStream =
-              new LimitedInputStream(spillInputStreams[i], 
partitionLengthInSpill);
-            if (compressionCodec != null) {
-              partitionInputStream = 
compressionCodec.compressedInputStream(partitionInputStream);
-            }
-            ByteStreams.copy(partitionInputStream, mergedFileOutputStream);
-          }
-        }
-        mergedFileOutputStream.flush();
-        mergedFileOutputStream.close();
-        partitionLengths[partition] = (outputFile.length() - 
initialFileLength);
-      }
-      threwException = false;
-    } finally {
-      // To avoid masking exceptions that caused us to prematurely enter the 
finally block, only
-      // throw exceptions during cleanup if threwException == false.
-      for (InputStream stream : spillInputStreams) {
-        Closeables.close(stream, threwException);
-      }
-      Closeables.close(mergedFileOutputStream, threwException);
-    }
-    return partitionLengths;
-  }
-
-  /**
-   * Merges spill files by using NIO's transferTo to concatenate spill 
partitions' bytes.
-   * This is only safe when the IO compression codec and serializer support 
concatenation of
-   * serialized streams.
-   *
-   * @return the partition lengths in the merged file.
-   */
-  private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File 
outputFile) throws IOException {
-    assert (spills.length >= 2);
-    final int numPartitions = partitioner.numPartitions();
-    final long[] partitionLengths = new long[numPartitions];
-    final FileChannel[] spillInputChannels = new FileChannel[spills.length];
-    final long[] spillInputChannelPositions = new long[spills.length];
-    FileChannel mergedFileOutputChannel = null;
-
-    boolean threwException = true;
-    try {
-      for (int i = 0; i < spills.length; i++) {
-        spillInputChannels[i] = new 
FileInputStream(spills[i].file).getChannel();
-      }
-      // This file needs to opened in append mode in order to work around a 
Linux kernel bug that
-      // affects transferTo; see SPARK-3948 for more details.
-      mergedFileOutputChannel = new FileOutputStream(outputFile, 
true).getChannel();
-
-      long bytesWrittenToMergedFile = 0;
-      for (int partition = 0; partition < numPartitions; partition++) {
-        for (int i = 0; i < spills.length; i++) {
-          final long partitionLengthInSpill = 
spills[i].partitionLengths[partition];
-          long bytesToTransfer = partitionLengthInSpill;
-          final FileChannel spillInputChannel = spillInputChannels[i];
-          final long writeStartTime = System.nanoTime();
-          while (bytesToTransfer > 0) {
-            final long actualBytesTransferred = spillInputChannel.transferTo(
-              spillInputChannelPositions[i],
-              bytesToTransfer,
-              mergedFileOutputChannel);
-            spillInputChannelPositions[i] += actualBytesTransferred;
-            bytesToTransfer -= actualBytesTransferred;
-          }
-          writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
-          bytesWrittenToMergedFile += partitionLengthInSpill;
-          partitionLengths[partition] += partitionLengthInSpill;
-        }
-      }
-      // Check the position after transferTo loop to see if it is in the right 
position and raise an
-      // exception if it is incorrect. The position will not be increased to 
the expected length
-      // after calling transferTo in kernel version 2.6.32. This issue is 
described at
-      // https://bugs.openjdk.java.net/browse/JDK-7052359 and SPARK-3948.
-      if (mergedFileOutputChannel.position() != bytesWrittenToMergedFile) {
-        throw new IOException(
-          "Current position " + mergedFileOutputChannel.position() + " does 
not equal expected " +
-            "position " + bytesWrittenToMergedFile + " after transferTo. 
Please check your kernel" +
-            " version to see if it is 2.6.32, as there is a kernel bug which 
will lead to " +
-            "unexpected behavior when using transferTo. You can set 
spark.file.transferTo=false " +
-            "to disable this NIO feature."
-        );
-      }
-      threwException = false;
-    } finally {
-      // To avoid masking exceptions that caused us to prematurely enter the 
finally block, only
-      // throw exceptions during cleanup if threwException == false.
-      for (int i = 0; i < spills.length; i++) {
-        assert(spillInputChannelPositions[i] == spills[i].file.length());
-        Closeables.close(spillInputChannels[i], threwException);
-      }
-      Closeables.close(mergedFileOutputChannel, threwException);
-    }
-    return partitionLengths;
-  }
-
-  @Override
-  public Option<MapStatus> stop(boolean success) {
-    try {
-      // Update task metrics from accumulators (null in 
UnsafeShuffleWriterSuite)
-      Map<String, Accumulator<Object>> internalAccumulators =
-        taskContext.internalMetricsToAccumulators();
-      if (internalAccumulators != null) {
-        internalAccumulators.apply(InternalAccumulator.PEAK_EXECUTION_MEMORY())
-          .add(getPeakMemoryUsedBytes());
-      }
-
-      if (stopping) {
-        return Option.apply(null);
-      } else {
-        stopping = true;
-        if (success) {
-          if (mapStatus == null) {
-            throw new IllegalStateException("Cannot call stop(true) without 
having called write()");
-          }
-          return Option.apply(mapStatus);
-        } else {
-          // The map task failed, so delete our output data.
-          shuffleBlockResolver.removeDataByMap(shuffleId, mapId);
-          return Option.apply(null);
-        }
-      }
-    } finally {
-      if (sorter != null) {
-        // If sorter is non-null, then this implies that we called stop() in 
response to an error,
-        // so we need to clean up memory and spill files created by the sorter
-        sorter.cleanupResources();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index c329983..704158b 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -330,7 +330,7 @@ object SparkEnv extends Logging {
     val shortShuffleMgrNames = Map(
       "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
       "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
-      "tungsten-sort" -> 
"org.apache.spark.shuffle.unsafe.UnsafeShuffleManager")
+      "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
     val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
     val shuffleMgrClass = 
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
     val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index 9df4e55..1105167 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -19,9 +19,53 @@ package org.apache.spark.shuffle.sort
 
 import java.util.concurrent.ConcurrentHashMap
 
-import org.apache.spark.{Logging, SparkConf, TaskContext, ShuffleDependency}
+import org.apache.spark._
+import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle._
 
+/**
+ * In sort-based shuffle, incoming records are sorted according to their 
target partition ids, then
+ * written to a single map output file. Reducers fetch contiguous regions of 
this file in order to
+ * read their portion of the map output. In cases where the map output data is 
too large to fit in
+ * memory, sorted subsets of the output can are spilled to disk and those 
on-disk files are merged
+ * to produce the final output file.
+ *
+ * Sort-based shuffle has two different write paths for producing its map 
output files:
+ *
+ *  - Serialized sorting: used when all three of the following conditions hold:
+ *    1. The shuffle dependency specifies no aggregation or output ordering.
+ *    2. The shuffle serializer supports relocation of serialized values (this 
is currently
+ *       supported by KryoSerializer and Spark SQL's custom serializers).
+ *    3. The shuffle produces fewer than 16777216 output partitions.
+ *  - Deserialized sorting: used to handle all other cases.
+ *
+ * -----------------------
+ * Serialized sorting mode
+ * -----------------------
+ *
+ * In the serialized sorting mode, incoming records are serialized as soon as 
they are passed to the
+ * shuffle writer and are buffered in a serialized form during sorting. This 
write path implements
+ * several optimizations:
+ *
+ *  - Its sort operates on serialized binary data rather than Java objects, 
which reduces memory
+ *    consumption and GC overheads. This optimization requires the record 
serializer to have certain
+ *    properties to allow serialized records to be re-ordered without 
requiring deserialization.
+ *    See SPARK-4550, where this optimization was first proposed and 
implemented, for more details.
+ *
+ *  - It uses a specialized cache-efficient sorter ([[ShuffleExternalSorter]]) 
that sorts
+ *    arrays of compressed record pointers and partition ids. By using only 8 
bytes of space per
+ *    record in the sorting array, this fits more of the array into cache.
+ *
+ *  - The spill merging procedure operates on blocks of serialized records 
that belong to the same
+ *    partition and does not need to deserialize records during the merge.
+ *
+ *  - When the spill compression codec supports concatenation of compressed 
data, the spill merge
+ *    simply concatenates the serialized and compressed spill partitions to 
produce the final output
+ *    partition.  This allows efficient data copying methods, like NIO's 
`transferTo`, to be used
+ *    and avoids the need to allocate decompression or copying buffers during 
the merge.
+ *
+ * For more details on these optimizations, see SPARK-7081.
+ */
 private[spark] class SortShuffleManager(conf: SparkConf) extends 
ShuffleManager with Logging {
 
   if (!conf.getBoolean("spark.shuffle.spill", true)) {
@@ -30,8 +74,12 @@ private[spark] class SortShuffleManager(conf: SparkConf) 
extends ShuffleManager
         " Shuffle will continue to spill to disk when necessary.")
   }
 
-  private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf)
-  private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()
+  /**
+   * A mapping from shuffle ids to the number of mappers producing output for 
those shuffles.
+   */
+  private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()
+
+  override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
 
   /**
    * Register a shuffle with the manager and obtain a handle for it to pass to 
tasks.
@@ -40,7 +88,22 @@ private[spark] class SortShuffleManager(conf: SparkConf) 
extends ShuffleManager
       shuffleId: Int,
       numMaps: Int,
       dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
-    new BaseShuffleHandle(shuffleId, numMaps, dependency)
+    if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, 
dependency)) {
+      // If there are fewer than spark.shuffle.sort.bypassMergeThreshold 
partitions and we don't
+      // need map-side aggregation, then write numPartitions files directly 
and just concatenate
+      // them at the end. This avoids doing serialization and deserialization 
twice to merge
+      // together the spilled files, which would happen with the normal code 
path. The downside is
+      // having multiple files open at a time and thus more memory allocated 
to buffers.
+      new BypassMergeSortShuffleHandle[K, V](
+        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, 
V]])
+    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
+      // Otherwise, try to buffer map outputs in a serialized form, since this 
is more efficient:
+      new SerializedShuffleHandle[K, V](
+        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, 
V]])
+    } else {
+      // Otherwise, buffer map outputs in a deserialized form:
+      new BaseShuffleHandle(shuffleId, numMaps, dependency)
+    }
   }
 
   /**
@@ -52,38 +115,114 @@ private[spark] class SortShuffleManager(conf: SparkConf) 
extends ShuffleManager
       startPartition: Int,
       endPartition: Int,
       context: TaskContext): ShuffleReader[K, C] = {
-    // We currently use the same block store shuffle fetcher as the hash-based 
shuffle.
     new BlockStoreShuffleReader(
       handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, 
endPartition, context)
   }
 
   /** Get a writer for a given partition. Called on executors by map tasks. */
-  override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: 
TaskContext)
-      : ShuffleWriter[K, V] = {
-    val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]]
-    shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, 
baseShuffleHandle.numMaps)
-    new SortShuffleWriter(
-      shuffleBlockResolver, baseShuffleHandle, mapId, context)
+  override def getWriter[K, V](
+      handle: ShuffleHandle,
+      mapId: Int,
+      context: TaskContext): ShuffleWriter[K, V] = {
+    numMapsForShuffle.putIfAbsent(
+      handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, 
_]].numMaps)
+    val env = SparkEnv.get
+    handle match {
+      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V 
@unchecked] =>
+        new UnsafeShuffleWriter(
+          env.blockManager,
+          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
+          context.taskMemoryManager(),
+          env.shuffleMemoryManager,
+          unsafeShuffleHandle,
+          mapId,
+          context,
+          env.conf)
+      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V 
@unchecked] =>
+        new BypassMergeSortShuffleWriter(
+          env.blockManager,
+          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
+          bypassMergeSortHandle,
+          mapId,
+          context,
+          env.conf)
+      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
+        new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
+    }
   }
 
   /** Remove a shuffle's metadata from the ShuffleManager. */
   override def unregisterShuffle(shuffleId: Int): Boolean = {
-    if (shuffleMapNumber.containsKey(shuffleId)) {
-      val numMaps = shuffleMapNumber.remove(shuffleId)
-      (0 until numMaps).map{ mapId =>
+    Option(numMapsForShuffle.remove(shuffleId)).foreach { numMaps =>
+      (0 until numMaps).foreach { mapId =>
         shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
       }
     }
     true
   }
 
-  override val shuffleBlockResolver: IndexShuffleBlockResolver = {
-    indexShuffleBlockResolver
-  }
-
   /** Shut down this ShuffleManager. */
   override def stop(): Unit = {
     shuffleBlockResolver.stop()
   }
 }
 
+
+private[spark] object SortShuffleManager extends Logging {
+
+  /**
+   * The maximum number of shuffle output partitions that SortShuffleManager 
supports when
+   * buffering map outputs in a serialized form. This is an extreme defensive 
programming measure,
+   * since it's extremely unlikely that a single shuffle produces over 16 
million output partitions.
+   * */
+  val MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE =
+    PackedRecordPointer.MAXIMUM_PARTITION_ID + 1
+
+  /**
+   * Helper method for determining whether a shuffle should use an optimized 
serialized shuffle
+   * path or whether it should fall back to the original path that operates on 
deserialized objects.
+   */
+  def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean 
= {
+    val shufId = dependency.shuffleId
+    val numPartitions = dependency.partitioner.numPartitions
+    val serializer = Serializer.getSerializer(dependency.serializer)
+    if (!serializer.supportsRelocationOfSerializedObjects) {
+      log.debug(s"Can't use serialized shuffle for shuffle $shufId because the 
serializer, " +
+        s"${serializer.getClass.getName}, does not support object relocation")
+      false
+    } else if (dependency.aggregator.isDefined) {
+      log.debug(
+        s"Can't use serialized shuffle for shuffle $shufId because an 
aggregator is defined")
+      false
+    } else if (numPartitions > 
MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
+      log.debug(s"Can't use serialized shuffle for shuffle $shufId because it 
has more than " +
+        s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
+      false
+    } else {
+      log.debug(s"Can use serialized shuffle for shuffle $shufId")
+      true
+    }
+  }
+}
+
+/**
+ * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to 
use the
+ * serialized shuffle.
+ */
+private[spark] class SerializedShuffleHandle[K, V](
+  shuffleId: Int,
+  numMaps: Int,
+  dependency: ShuffleDependency[K, V, V])
+  extends BaseShuffleHandle(shuffleId, numMaps, dependency) {
+}
+
+/**
+ * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to 
use the
+ * bypass merge sort shuffle path.
+ */
+private[spark] class BypassMergeSortShuffleHandle[K, V](
+  shuffleId: Int,
+  numMaps: Int,
+  dependency: ShuffleDependency[K, V, V])
+  extends BaseShuffleHandle(shuffleId, numMaps, dependency) {
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 5865e76..bbd9c1a 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -20,7 +20,6 @@ package org.apache.spark.shuffle.sort
 import org.apache.spark._
 import org.apache.spark.executor.ShuffleWriteMetrics
 import org.apache.spark.scheduler.MapStatus
-import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, 
BaseShuffleHandle}
 import org.apache.spark.storage.ShuffleBlockId
 import org.apache.spark.util.collection.ExternalSorter
@@ -36,7 +35,7 @@ private[spark] class SortShuffleWriter[K, V, C](
 
   private val blockManager = SparkEnv.get.blockManager
 
-  private var sorter: SortShuffleFileWriter[K, V] = null
+  private var sorter: ExternalSorter[K, V, _] = null
 
   // Are we in the process of stopping? Because map tasks can call stop() with 
success = true
   // and then call stop() with success = false if they get an exception, we 
want to make sure
@@ -54,15 +53,6 @@ private[spark] class SortShuffleWriter[K, V, C](
       require(dep.aggregator.isDefined, "Map-side combine without Aggregator 
specified!")
       new ExternalSorter[K, V, C](
         dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
-    } else if (SortShuffleWriter.shouldBypassMergeSort(
-        SparkEnv.get.conf, dep.partitioner.numPartitions, aggregator = None, 
keyOrdering = None)) {
-      // If there are fewer than spark.shuffle.sort.bypassMergeThreshold 
partitions and we don't
-      // need local aggregation and sorting, write numPartitions files 
directly and just concatenate
-      // them at the end. This avoids doing serialization and deserialization 
twice to merge
-      // together the spilled files, which would happen with the normal code 
path. The downside is
-      // having multiple files open at a time and thus more memory allocated 
to buffers.
-      new BypassMergeSortShuffleWriter[K, V](SparkEnv.get.conf, blockManager, 
dep.partitioner,
-        writeMetrics, Serializer.getSerializer(dep.serializer))
     } else {
       // In this case we pass neither an aggregator nor an ordering to the 
sorter, because we don't
       // care whether the keys get sorted in each partition; that will be done 
on the reduce side
@@ -111,12 +101,14 @@ private[spark] class SortShuffleWriter[K, V, C](
 }
 
 private[spark] object SortShuffleWriter {
-  def shouldBypassMergeSort(
-      conf: SparkConf,
-      numPartitions: Int,
-      aggregator: Option[Aggregator[_, _, _]],
-      keyOrdering: Option[Ordering[_]]): Boolean = {
-    val bypassMergeThreshold: Int = 
conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
-    numPartitions <= bypassMergeThreshold && aggregator.isEmpty && 
keyOrdering.isEmpty
+  def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): 
Boolean = {
+    // We cannot bypass sorting if we need to do map-side aggregation.
+    if (dep.mapSideCombine) {
+      require(dep.aggregator.isDefined, "Map-side combine without Aggregator 
specified!")
+      false
+    } else {
+      val bypassMergeThreshold: Int = 
conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
+      dep.partitioner.numPartitions <= bypassMergeThreshold
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
 
b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
deleted file mode 100644
index 75f22f6..0000000
--- 
a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.unsafe
-
-import java.util.Collections
-import java.util.concurrent.ConcurrentHashMap
-
-import org.apache.spark._
-import org.apache.spark.serializer.Serializer
-import org.apache.spark.shuffle._
-import org.apache.spark.shuffle.sort.SortShuffleManager
-
-/**
- * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to 
use the new shuffle.
- */
-private[spark] class UnsafeShuffleHandle[K, V](
-    shuffleId: Int,
-    numMaps: Int,
-    dependency: ShuffleDependency[K, V, V])
-  extends BaseShuffleHandle(shuffleId, numMaps, dependency) {
-}
-
-private[spark] object UnsafeShuffleManager extends Logging {
-
-  /**
-   * The maximum number of shuffle output partitions that UnsafeShuffleManager 
supports.
-   */
-  val MAX_SHUFFLE_OUTPUT_PARTITIONS = PackedRecordPointer.MAXIMUM_PARTITION_ID 
+ 1
-
-  /**
-   * Helper method for determining whether a shuffle should use the optimized 
unsafe shuffle
-   * path or whether it should fall back to the original sort-based shuffle.
-   */
-  def canUseUnsafeShuffle[K, V, C](dependency: ShuffleDependency[K, V, C]): 
Boolean = {
-    val shufId = dependency.shuffleId
-    val serializer = Serializer.getSerializer(dependency.serializer)
-    if (!serializer.supportsRelocationOfSerializedObjects) {
-      log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because the 
serializer, " +
-        s"${serializer.getClass.getName}, does not support object relocation")
-      false
-    } else if (dependency.aggregator.isDefined) {
-      log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because an 
aggregator is defined")
-      false
-    } else if (dependency.partitioner.numPartitions > 
MAX_SHUFFLE_OUTPUT_PARTITIONS) {
-      log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has 
more than " +
-        s"$MAX_SHUFFLE_OUTPUT_PARTITIONS partitions")
-      false
-    } else {
-      log.debug(s"Can use UnsafeShuffle for shuffle $shufId")
-      true
-    }
-  }
-}
-
-/**
- * A shuffle implementation that uses directly-managed memory to implement 
several performance
- * optimizations for certain types of shuffles. In cases where the new 
performance optimizations
- * cannot be applied, this shuffle manager delegates to [[SortShuffleManager]] 
to handle those
- * shuffles.
- *
- * UnsafeShuffleManager's optimizations will apply when _all_ of the following 
conditions hold:
- *
- *  - The shuffle dependency specifies no aggregation or output ordering.
- *  - The shuffle serializer supports relocation of serialized values (this is 
currently supported
- *    by KryoSerializer and Spark SQL's custom serializers).
- *  - The shuffle produces fewer than 16777216 output partitions.
- *  - No individual record is larger than 128 MB when serialized.
- *
- * In addition, extra spill-merging optimizations are automatically applied 
when the shuffle
- * compression codec supports concatenation of serialized streams. This is 
currently supported by
- * Spark's LZF serializer.
- *
- * At a high-level, UnsafeShuffleManager's design is similar to Spark's 
existing SortShuffleManager.
- * In sort-based shuffle, incoming records are sorted according to their 
target partition ids, then
- * written to a single map output file. Reducers fetch contiguous regions of 
this file in order to
- * read their portion of the map output. In cases where the map output data is 
too large to fit in
- * memory, sorted subsets of the output can are spilled to disk and those 
on-disk files are merged
- * to produce the final output file.
- *
- * UnsafeShuffleManager optimizes this process in several ways:
- *
- *  - Its sort operates on serialized binary data rather than Java objects, 
which reduces memory
- *    consumption and GC overheads. This optimization requires the record 
serializer to have certain
- *    properties to allow serialized records to be re-ordered without 
requiring deserialization.
- *    See SPARK-4550, where this optimization was first proposed and 
implemented, for more details.
- *
- *  - It uses a specialized cache-efficient sorter 
([[UnsafeShuffleExternalSorter]]) that sorts
- *    arrays of compressed record pointers and partition ids. By using only 8 
bytes of space per
- *    record in the sorting array, this fits more of the array into cache.
- *
- *  - The spill merging procedure operates on blocks of serialized records 
that belong to the same
- *    partition and does not need to deserialize records during the merge.
- *
- *  - When the spill compression codec supports concatenation of compressed 
data, the spill merge
- *    simply concatenates the serialized and compressed spill partitions to 
produce the final output
- *    partition.  This allows efficient data copying methods, like NIO's 
`transferTo`, to be used
- *    and avoids the need to allocate decompression or copying buffers during 
the merge.
- *
- * For more details on UnsafeShuffleManager's design, see SPARK-7081.
- */
-private[spark] class UnsafeShuffleManager(conf: SparkConf) extends 
ShuffleManager with Logging {
-
-  if (!conf.getBoolean("spark.shuffle.spill", true)) {
-    logWarning(
-      "spark.shuffle.spill was set to false, but this is ignored by the 
tungsten-sort shuffle " +
-      "manager; its optimized shuffles will continue to spill to disk when 
necessary.")
-  }
-
-  private[this] val sortShuffleManager: SortShuffleManager = new 
SortShuffleManager(conf)
-  private[this] val shufflesThatFellBackToSortShuffle =
-    Collections.newSetFromMap(new ConcurrentHashMap[Int, java.lang.Boolean]())
-  private[this] val numMapsForShufflesThatUsedNewPath = new 
ConcurrentHashMap[Int, Int]()
-
-  /**
-   * Register a shuffle with the manager and obtain a handle for it to pass to 
tasks.
-   */
-  override def registerShuffle[K, V, C](
-      shuffleId: Int,
-      numMaps: Int,
-      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
-    if (UnsafeShuffleManager.canUseUnsafeShuffle(dependency)) {
-      new UnsafeShuffleHandle[K, V](
-        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, 
V]])
-    } else {
-      new BaseShuffleHandle(shuffleId, numMaps, dependency)
-    }
-  }
-
-  /**
-   * Get a reader for a range of reduce partitions (startPartition to 
endPartition-1, inclusive).
-   * Called on executors by reduce tasks.
-   */
-  override def getReader[K, C](
-      handle: ShuffleHandle,
-      startPartition: Int,
-      endPartition: Int,
-      context: TaskContext): ShuffleReader[K, C] = {
-    sortShuffleManager.getReader(handle, startPartition, endPartition, context)
-  }
-
-  /** Get a writer for a given partition. Called on executors by map tasks. */
-  override def getWriter[K, V](
-      handle: ShuffleHandle,
-      mapId: Int,
-      context: TaskContext): ShuffleWriter[K, V] = {
-    handle match {
-      case unsafeShuffleHandle: UnsafeShuffleHandle[K @unchecked, V 
@unchecked] =>
-        numMapsForShufflesThatUsedNewPath.putIfAbsent(handle.shuffleId, 
unsafeShuffleHandle.numMaps)
-        val env = SparkEnv.get
-        new UnsafeShuffleWriter(
-          env.blockManager,
-          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
-          context.taskMemoryManager(),
-          env.shuffleMemoryManager,
-          unsafeShuffleHandle,
-          mapId,
-          context,
-          env.conf)
-      case other =>
-        shufflesThatFellBackToSortShuffle.add(handle.shuffleId)
-        sortShuffleManager.getWriter(handle, mapId, context)
-    }
-  }
-
-  /** Remove a shuffle's metadata from the ShuffleManager. */
-  override def unregisterShuffle(shuffleId: Int): Boolean = {
-    if (shufflesThatFellBackToSortShuffle.remove(shuffleId)) {
-      sortShuffleManager.unregisterShuffle(shuffleId)
-    } else {
-      Option(numMapsForShufflesThatUsedNewPath.remove(shuffleId)).foreach { 
numMaps =>
-        (0 until numMaps).foreach { mapId =>
-          shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
-        }
-      }
-      true
-    }
-  }
-
-  override val shuffleBlockResolver: IndexShuffleBlockResolver = {
-    sortShuffleManager.shuffleBlockResolver
-  }
-
-  /** Shut down this ShuffleManager. */
-  override def stop(): Unit = {
-    sortShuffleManager.stop()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/scala/org/apache/spark/util/collection/ChainedBuffer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ChainedBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ChainedBuffer.scala
deleted file mode 100644
index ae60f3b..0000000
--- a/core/src/main/scala/org/apache/spark/util/collection/ChainedBuffer.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util.collection
-
-import java.io.OutputStream
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * A logical byte buffer that wraps a list of byte arrays. All the byte arrays 
have equal size. The
- * advantage of this over a standard ArrayBuffer is that it can grow without 
claiming large amounts
- * of memory and needing to copy the full contents. The disadvantage is that 
the contents don't
- * occupy a contiguous segment of memory.
- */
-private[spark] class ChainedBuffer(chunkSize: Int) {
-
-  private val chunkSizeLog2: Int = java.lang.Long.numberOfTrailingZeros(
-    java.lang.Long.highestOneBit(chunkSize))
-  assert((1 << chunkSizeLog2) == chunkSize,
-    s"ChainedBuffer chunk size $chunkSize must be a power of two")
-  private val chunks: ArrayBuffer[Array[Byte]] = new ArrayBuffer[Array[Byte]]()
-  private var _size: Long = 0
-
-  /**
-   * Feed bytes from this buffer into a DiskBlockObjectWriter.
-   *
-   * @param pos Offset in the buffer to read from.
-   * @param os OutputStream to read into.
-   * @param len Number of bytes to read.
-   */
-  def read(pos: Long, os: OutputStream, len: Int): Unit = {
-    if (pos + len > _size) {
-      throw new IndexOutOfBoundsException(
-        s"Read of $len bytes at position $pos would go past size ${_size} of 
buffer")
-    }
-    var chunkIndex: Int = (pos >> chunkSizeLog2).toInt
-    var posInChunk: Int = (pos - (chunkIndex.toLong << chunkSizeLog2)).toInt
-    var written: Int = 0
-    while (written < len) {
-      val toRead: Int = math.min(len - written, chunkSize - posInChunk)
-      os.write(chunks(chunkIndex), posInChunk, toRead)
-      written += toRead
-      chunkIndex += 1
-      posInChunk = 0
-    }
-  }
-
-  /**
-   * Read bytes from this buffer into a byte array.
-   *
-   * @param pos Offset in the buffer to read from.
-   * @param bytes Byte array to read into.
-   * @param offs Offset in the byte array to read to.
-   * @param len Number of bytes to read.
-   */
-  def read(pos: Long, bytes: Array[Byte], offs: Int, len: Int): Unit = {
-    if (pos + len > _size) {
-      throw new IndexOutOfBoundsException(
-        s"Read of $len bytes at position $pos would go past size of buffer")
-    }
-    var chunkIndex: Int = (pos >> chunkSizeLog2).toInt
-    var posInChunk: Int = (pos - (chunkIndex.toLong << chunkSizeLog2)).toInt
-    var written: Int = 0
-    while (written < len) {
-      val toRead: Int = math.min(len - written, chunkSize - posInChunk)
-      System.arraycopy(chunks(chunkIndex), posInChunk, bytes, offs + written, 
toRead)
-      written += toRead
-      chunkIndex += 1
-      posInChunk = 0
-    }
-  }
-
-  /**
-   * Write bytes from a byte array into this buffer.
-   *
-   * @param pos Offset in the buffer to write to.
-   * @param bytes Byte array to write from.
-   * @param offs Offset in the byte array to write from.
-   * @param len Number of bytes to write.
-   */
-  def write(pos: Long, bytes: Array[Byte], offs: Int, len: Int): Unit = {
-    if (pos > _size) {
-      throw new IndexOutOfBoundsException(
-        s"Write at position $pos starts after end of buffer ${_size}")
-    }
-    // Grow if needed
-    val endChunkIndex: Int = ((pos + len - 1) >> chunkSizeLog2).toInt
-    while (endChunkIndex >= chunks.length) {
-      chunks += new Array[Byte](chunkSize)
-    }
-
-    var chunkIndex: Int = (pos >> chunkSizeLog2).toInt
-    var posInChunk: Int = (pos - (chunkIndex.toLong << chunkSizeLog2)).toInt
-    var written: Int = 0
-    while (written < len) {
-      val toWrite: Int = math.min(len - written, chunkSize - posInChunk)
-      System.arraycopy(bytes, offs + written, chunks(chunkIndex), posInChunk, 
toWrite)
-      written += toWrite
-      chunkIndex += 1
-      posInChunk = 0
-    }
-
-    _size = math.max(_size, pos + len)
-  }
-
-  /**
-   * Total size of buffer that can be written to without allocating additional 
memory.
-   */
-  def capacity: Long = chunks.size.toLong * chunkSize
-
-  /**
-   * Size of the logical buffer.
-   */
-  def size: Long = _size
-}
-
-/**
- * Output stream that writes to a ChainedBuffer.
- */
-private[spark] class ChainedBufferOutputStream(chainedBuffer: ChainedBuffer) 
extends OutputStream {
-  private var pos: Long = 0
-
-  override def write(b: Int): Unit = {
-    throw new UnsupportedOperationException()
-  }
-
-  override def write(bytes: Array[Byte], offs: Int, len: Int): Unit = {
-    chainedBuffer.write(pos, bytes, offs, len)
-    pos += len
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 749be34..c48c453 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -29,7 +29,6 @@ import com.google.common.io.ByteStreams
 import org.apache.spark._
 import org.apache.spark.serializer._
 import org.apache.spark.executor.ShuffleWriteMetrics
-import org.apache.spark.shuffle.sort.{SortShuffleFileWriter, SortShuffleWriter}
 import org.apache.spark.storage.{BlockId, DiskBlockObjectWriter}
 
 /**
@@ -69,8 +68,8 @@ import org.apache.spark.storage.{BlockId, 
DiskBlockObjectWriter}
  * At a high level, this class works internally as follows:
  *
  * - We repeatedly fill up buffers of in-memory data, using either a 
PartitionedAppendOnlyMap if
- *   we want to combine by key, or a PartitionedSerializedPairBuffer or 
PartitionedPairBuffer if we
- *   don't. Inside these buffers, we sort elements by partition ID and then 
possibly also by key.
+ *   we want to combine by key, or a PartitionedPairBuffer if we don't.
+ *   Inside these buffers, we sort elements by partition ID and then possibly 
also by key.
  *   To avoid calling the partitioner multiple times with each key, we store 
the partition ID
  *   alongside each record.
  *
@@ -93,8 +92,7 @@ private[spark] class ExternalSorter[K, V, C](
     ordering: Option[Ordering[K]] = None,
     serializer: Option[Serializer] = None)
   extends Logging
-  with Spillable[WritablePartitionedPairCollection[K, C]]
-  with SortShuffleFileWriter[K, V] {
+  with Spillable[WritablePartitionedPairCollection[K, C]] {
 
   private val conf = SparkEnv.get.conf
 
@@ -104,13 +102,6 @@ private[spark] class ExternalSorter[K, V, C](
     if (shouldPartition) partitioner.get.getPartition(key) else 0
   }
 
-  // Since SPARK-7855, bypassMergeSort optimization is no longer performed as 
part of this class.
-  // As a sanity check, make sure that we're not handling a shuffle which 
should use that path.
-  if (SortShuffleWriter.shouldBypassMergeSort(conf, numPartitions, aggregator, 
ordering)) {
-    throw new IllegalArgumentException("ExternalSorter should not be used to 
handle "
-      + " a sort that the BypassMergeSortShuffleWriter should handle")
-  }
-
   private val blockManager = SparkEnv.get.blockManager
   private val diskBlockManager = blockManager.diskBlockManager
   private val ser = Serializer.getSerializer(serializer)
@@ -128,23 +119,11 @@ private[spark] class ExternalSorter[K, V, C](
   // grow internal data structures by growing + copying every time the number 
of objects doubles.
   private val serializerBatchSize = 
conf.getLong("spark.shuffle.spill.batchSize", 10000)
 
-  private val useSerializedPairBuffer =
-    ordering.isEmpty &&
-      conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
-      ser.supportsRelocationOfSerializedObjects
-  private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 
22) // 4 MB
-  private def newBuffer(): WritablePartitionedPairCollection[K, C] with 
SizeTracker = {
-    if (useSerializedPairBuffer) {
-      new PartitionedSerializedPairBuffer(metaInitialRecords = 256, 
kvChunkSize, serInstance)
-    } else {
-      new PartitionedPairBuffer[K, C]
-    }
-  }
   // Data structures to store in-memory objects before we spill. Depending on 
whether we have an
   // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
   // store them in an array buffer.
   private var map = new PartitionedAppendOnlyMap[K, C]
-  private var buffer = newBuffer()
+  private var buffer = new PartitionedPairBuffer[K, C]
 
   // Total spilling statistics
   private var _diskBytesSpilled = 0L
@@ -192,7 +171,7 @@ private[spark] class ExternalSorter[K, V, C](
    */
   private[spark] def numSpills: Int = spills.size
 
-  override def insertAll(records: Iterator[Product2[K, V]]): Unit = {
+  def insertAll(records: Iterator[Product2[K, V]]): Unit = {
     // TODO: stop combining if we find that the reduction factor isn't high
     val shouldCombine = aggregator.isDefined
 
@@ -236,7 +215,7 @@ private[spark] class ExternalSorter[K, V, C](
     } else {
       estimatedSize = buffer.estimateSize()
       if (maybeSpill(buffer, estimatedSize)) {
-        buffer = newBuffer()
+        buffer = new PartitionedPairBuffer[K, C]
       }
     }
 
@@ -659,7 +638,7 @@ private[spark] class ExternalSorter[K, V, C](
    * @param context a TaskContext for a running Spark task, for us to update 
shuffle metrics.
    * @return array of lengths, in bytes, of each partition of the file (used 
by map output tracker)
    */
-  override def writePartitionedFile(
+  def writePartitionedFile(
       blockId: BlockId,
       context: TaskContext,
       outputFile: File): Array[Long] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to