Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8005#discussion_r36483832
  
    --- Diff: 
core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
 ---
    @@ -397,53 +399,84 @@ private void allocateSpaceForRecord(int 
requiredSpace) throws IOException {
       }
     
       /**
    -   * Write a record to the shuffle sorter.
    +   * Write a record to the shuffle inMemSorter.
        */
       public void insertRecord(
           Object recordBaseObject,
           long recordBaseOffset,
           int lengthInBytes,
           int partitionId) throws IOException {
    +
    +    growPointerArrayIfNecessary();
         // Need 4 bytes to store the record length.
         final int totalSpaceRequired = lengthInBytes + 4;
    -    if (!haveSpaceForRecord(totalSpaceRequired)) {
    -      allocateSpaceForRecord(totalSpaceRequired);
    +
    +    // --- Figure out where to insert the new record 
----------------------------------------------
    +
    +    final MemoryBlock dataPage;
    +    long dataPagePosition;
    +    boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
    +    if (useOverflowPage) {
    +      long overflowPageSize = 
ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
    +      // The record is larger than the page size, so allocate a special 
overflow page just to hold
    +      // that record.
    +      final long memoryGranted = 
shuffleMemoryManager.tryToAcquire(overflowPageSize);
    +      if (memoryGranted != overflowPageSize) {
    +        shuffleMemoryManager.release(memoryGranted);
    +        spill();
    +        final long memoryGrantedAfterSpill = 
shuffleMemoryManager.tryToAcquire(overflowPageSize);
    +        if (memoryGrantedAfterSpill != overflowPageSize) {
    +          shuffleMemoryManager.release(memoryGrantedAfterSpill);
    +          throw new IOException("Unable to acquire " + overflowPageSize + 
" bytes of memory");
    +        }
    +      }
    +      MemoryBlock overflowPage = 
taskMemoryManager.allocatePage(overflowPageSize);
    +      allocatedPages.add(overflowPage);
    +      dataPage = overflowPage;
    +      dataPagePosition = overflowPage.getBaseOffset();
    +    } else {
    +      // The record is small enough to fit in a regular data page, but the 
current page might not
    +      // have enough space to hold it (or no pages have been allocated 
yet).
    +      acquireNewPageIfNecessary(totalSpaceRequired);
    +      dataPage = currentPage;
    +      dataPagePosition = currentPagePosition;
    +      // Update bookkeeping information
    +      freeSpaceInCurrentPage -= totalSpaceRequired;
    +      currentPagePosition += totalSpaceRequired;
         }
    +    final Object dataPageBaseObject = dataPage.getBaseObject();
     
         final long recordAddress =
    -      memoryManager.encodePageNumberAndOffset(currentPage, 
currentPagePosition);
    -    final Object dataPageBaseObject = currentPage.getBaseObject();
    -    PlatformDependent.UNSAFE.putInt(dataPageBaseObject, 
currentPagePosition, lengthInBytes);
    -    currentPagePosition += 4;
    -    freeSpaceInCurrentPage -= 4;
    +      taskMemoryManager.encodePageNumberAndOffset(dataPage, 
dataPagePosition);
    +    PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, 
lengthInBytes);
    +    dataPagePosition += 4;
         PlatformDependent.copyMemory(
           recordBaseObject,
           recordBaseOffset,
           dataPageBaseObject,
    -      currentPagePosition,
    +      dataPagePosition,
           lengthInBytes);
    -    currentPagePosition += lengthInBytes;
    -    freeSpaceInCurrentPage -= lengthInBytes;
    -    sorter.insertRecord(recordAddress, partitionId);
    +    assert(inMemSorter != null);
    +    inMemSorter.insertRecord(recordAddress, partitionId);
       }
     
       /**
    -   * Close the sorter, causing any buffered data to be sorted and written 
out to disk.
    +   * Close the inMemorySorter, causing any buffered data to be sorted and 
written out to disk.
    --- End diff --
    
    sorter


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to