Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11534#discussion_r55476210
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
    @@ -789,117 +752,193 @@ private[spark] class BlockManager(
               // lockNewBlockForWriting returned a read lock on the existing 
block, so we must free it:
               releaseLock(blockId)
             }
    -        return DoPutSucceeded
    +        return true
           }
         }
     
         val startTimeMs = System.currentTimeMillis
     
    -    // Size of the block in bytes
    -    var size = 0L
    -
    -    // The level we actually use to put the block
    -    val putLevel = effectiveStorageLevel.getOrElse(level)
    -
    -    // If we're storing bytes, then initiate the replication before 
storing them locally.
    +    // Since we're storing bytes, initiate the replication before storing 
them locally.
         // This is faster as data is already serialized and ready to send.
    -    val replicationFuture = data match {
    -      case b: ByteBufferValues if putLevel.replication > 1 =>
    -        // Duplicate doesn't copy the bytes, but just creates a wrapper
    -        val bufferView = b.buffer.duplicate()
    -        Future {
    -          // This is a blocking action and should run in 
futureExecutionContext which is a cached
    -          // thread pool
    -          replicate(blockId, bufferView, putLevel)
    -        }(futureExecutionContext)
    -      case _ => null
    +    val replicationFuture = if (level.replication > 1) {
    +      // Duplicate doesn't copy the bytes, but just creates a wrapper
    +      val bufferView = bytes.duplicate()
    +      Future {
    +        // This is a blocking action and should run in 
futureExecutionContext which is a cached
    +        // thread pool
    +        replicate(blockId, bufferView, level)
    +      }(futureExecutionContext)
    +    } else {
    +      null
         }
     
         var blockWasSuccessfullyStored = false
    -    var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
    -
    -    putBlockInfo.synchronized {
    -      logTrace("Put for block %s took %s to get into synchronized block"
    -        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
     
    -      try {
    -        if (putLevel.useMemory) {
    -          // Put it in memory first, even if it also has useDisk set to 
true;
    -          // We will drop it to disk later if the memory store can't hold 
it.
    -          data match {
    -            case IteratorValues(iterator) =>
    -              memoryStore.putIterator(blockId, iterator(), putLevel) match 
{
    -                case Right(s) =>
    -                  size = s
    -                case Left(iter) =>
    -                  iteratorFromFailedMemoryStorePut = Some(iter)
    -              }
    -            case ByteBufferValues(bytes) =>
    -              bytes.rewind()
    -              size = bytes.limit()
    -              memoryStore.putBytes(blockId, bytes, putLevel)
    -          }
    -        } else if (putLevel.useDisk) {
    -          data match {
    -            case IteratorValues(iterator) =>
    -              diskStore.putIterator(blockId, iterator(), putLevel) match {
    -                case Right(s) =>
    -                  size = s
    -                // putIterator() will never return Left (see its return 
type).
    -              }
    -            case ByteBufferValues(bytes) =>
    -              bytes.rewind()
    -              size = bytes.limit()
    -              diskStore.putBytes(blockId, bytes, putLevel)
    -          }
    +    bytes.rewind()
    +    val size = bytes.limit()
    +
    +    try {
    +      if (level.useMemory) {
    +        // Put it in memory first, even if it also has useDisk set to true;
    +        // We will drop it to disk later if the memory store can't hold it.
    +        val putSucceeded = if (level.deserialized) {
    +          val values = dataDeserialize(blockId, bytes.duplicate())
    +          memoryStore.putIterator(blockId, values, level).isRight
             } else {
    -          assert(putLevel == StorageLevel.NONE)
    -          throw new BlockException(
    -            blockId, s"Attempted to put block $blockId without specifying 
storage level!")
    +          memoryStore.putBytes(blockId, size, () => bytes)
    +        }
    +        if (!putSucceeded && level.useDisk) {
    +          logWarning(s"Persisting block $blockId to disk instead.")
    +          diskStore.putBytes(blockId, bytes)
             }
    +      } else if (level.useDisk) {
    +        diskStore.putBytes(blockId, bytes)
    +      }
     
    -        val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
    -        blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
    -        if (blockWasSuccessfullyStored) {
    -          // Now that the block is in either the memory, 
externalBlockStore, or disk store,
    -          // let other threads read it, and tell the master about it.
    -          putBlockInfo.size = size
    -          if (tellMaster) {
    -            reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
    -          }
    -          Option(TaskContext.get()).foreach { c =>
    -            c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, 
putBlockStatus)))
    -          }
    +      val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
    +      blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
    +      if (blockWasSuccessfullyStored) {
    +        // Now that the block is in either the memory, externalBlockStore, 
or disk store,
    +        // tell the master about it.
    +        putBlockInfo.size = size
    +        if (tellMaster) {
    +          reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
             }
    -      } finally {
    -        if (blockWasSuccessfullyStored) {
    -          if (keepReadLock) {
    -            blockInfoManager.downgradeLock(blockId)
    -          } else {
    -            blockInfoManager.unlock(blockId)
    -          }
    +        Option(TaskContext.get()).foreach { c =>
    +          c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, 
putBlockStatus)))
    +        }
    +      }
    +    } finally {
    +      if (blockWasSuccessfullyStored) {
    +        if (keepReadLock) {
    +          blockInfoManager.downgradeLock(blockId)
             } else {
    -          blockInfoManager.removeBlock(blockId)
    -          logWarning(s"Putting block $blockId failed")
    +          blockInfoManager.unlock(blockId)
             }
    +      } else {
    +        blockInfoManager.removeBlock(blockId)
    +        logWarning(s"Putting block $blockId failed")
           }
         }
         logDebug("Put block %s locally took %s".format(blockId, 
Utils.getUsedTimeMs(startTimeMs)))
     
    -    if (replicationFuture != null) {
    +    if (level.replication > 1) {
           // Wait for asynchronous replication to finish
           Await.ready(replicationFuture, Duration.Inf)
    -    } else if (putLevel.replication > 1 && blockWasSuccessfullyStored) {
    -      val remoteStartTime = System.currentTimeMillis
    -      val bytesToReplicate: ByteBuffer = {
    -        doGetLocal(blockId, putBlockInfo, asBlockResult = false)
    -          .map(_.asInstanceOf[ByteBuffer])
    -          .getOrElse {
    -            throw new SparkException(s"Block $blockId was not found even 
though it was just stored")
    -          }
    +      logDebug("Putting block %s with replication took %s"
    +        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
    +    } else {
    +      logDebug("Putting block %s without replication took %s"
    +        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
    +    }
    +
    +    blockWasSuccessfullyStored
    +  }
    +
    +  /**
    +   * Put the given block according to the given level in one of the block 
stores, replicating
    +   * the values if necessary.
    +   *
    +   * If the block already exists, this method will not overwrite it.
    +   *
    +   * @param keepReadLock if true, this method will hold the read lock when 
it returns (even if the
    +   *                     block already exists). If false, this method will 
hold no locks when it
    +   *                     returns.
    +   * @return None if the block was already present or if the put 
succeeded, or Some(iterator)
    +   *         if the put failed.
    +   */
    +  private def doPutIterator(
    +      blockId: BlockId,
    +      iterator: () => Iterator[Any],
    +      level: StorageLevel,
    +      tellMaster: Boolean = true,
    +      keepReadLock: Boolean = false): Option[Iterator[Any]] = {
    +
    +    require(blockId != null, "BlockId is null")
    +    require(level != null && level.isValid, "StorageLevel is null or 
invalid")
    +
    +    /* Remember the block's storage level so that we can correctly drop it 
to disk if it needs
    +     * to be dropped right after it got put into memory. Note, however, 
that other threads will
    +     * not be able to get() this block until we call markReady on its 
BlockInfo. */
    +    val putBlockInfo = {
    +      val newInfo = new BlockInfo(level, tellMaster)
    +      if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
    +        newInfo
    +      } else {
    +        logWarning(s"Block $blockId already exists on this machine; not 
re-adding it")
    +        if (!keepReadLock) {
    +          // lockNewBlockForWriting returned a read lock on the existing 
block, so we must free it:
    +          releaseLock(blockId)
    +        }
    +        return None
    +      }
    +    }
    +
    +    val startTimeMs = System.currentTimeMillis
    +
    +    // Size of the block in bytes
    +    var size = 0L
    +
    +    var blockWasSuccessfullyStored = false
    +    var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
    +
    +    try {
    +      if (level.useMemory) {
    +        // Put it in memory first, even if it also has useDisk set to true;
    +        // We will drop it to disk later if the memory store can't hold it.
    +        memoryStore.putIterator(blockId, iterator(), level) match {
    +          case Right(s) =>
    +            size = s
    +          case Left(iter) =>
    +            // Not enough space to unroll this block; drop to disk if 
applicable
    +            if (level.useDisk) {
    +              logWarning(s"Persisting block $blockId to disk instead.")
    +              diskStore.put(blockId) { fileOutputStream =>
    +                dataSerializeStream(blockId, fileOutputStream, iter)
    --- End diff --
    
    Nope, the entire block is now on disk because `iter` contains all of the 
elements of the original input `iterator()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to