Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/11534#discussion_r55476210 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -789,117 +752,193 @@ private[spark] class BlockManager( // lockNewBlockForWriting returned a read lock on the existing block, so we must free it: releaseLock(blockId) } - return DoPutSucceeded + return true } } val startTimeMs = System.currentTimeMillis - // Size of the block in bytes - var size = 0L - - // The level we actually use to put the block - val putLevel = effectiveStorageLevel.getOrElse(level) - - // If we're storing bytes, then initiate the replication before storing them locally. + // Since we're storing bytes, initiate the replication before storing them locally. // This is faster as data is already serialized and ready to send. - val replicationFuture = data match { - case b: ByteBufferValues if putLevel.replication > 1 => - // Duplicate doesn't copy the bytes, but just creates a wrapper - val bufferView = b.buffer.duplicate() - Future { - // This is a blocking action and should run in futureExecutionContext which is a cached - // thread pool - replicate(blockId, bufferView, putLevel) - }(futureExecutionContext) - case _ => null + val replicationFuture = if (level.replication > 1) { + // Duplicate doesn't copy the bytes, but just creates a wrapper + val bufferView = bytes.duplicate() + Future { + // This is a blocking action and should run in futureExecutionContext which is a cached + // thread pool + replicate(blockId, bufferView, level) + }(futureExecutionContext) + } else { + null } var blockWasSuccessfullyStored = false - var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None - - putBlockInfo.synchronized { - logTrace("Put for block %s took %s to get into synchronized block" - .format(blockId, Utils.getUsedTimeMs(startTimeMs))) - try { - if (putLevel.useMemory) { - // Put it in memory first, even if it also has useDisk set to true; - // We will drop it to disk later if the memory store can't hold it. - data match { - case IteratorValues(iterator) => - memoryStore.putIterator(blockId, iterator(), putLevel) match { - case Right(s) => - size = s - case Left(iter) => - iteratorFromFailedMemoryStorePut = Some(iter) - } - case ByteBufferValues(bytes) => - bytes.rewind() - size = bytes.limit() - memoryStore.putBytes(blockId, bytes, putLevel) - } - } else if (putLevel.useDisk) { - data match { - case IteratorValues(iterator) => - diskStore.putIterator(blockId, iterator(), putLevel) match { - case Right(s) => - size = s - // putIterator() will never return Left (see its return type). - } - case ByteBufferValues(bytes) => - bytes.rewind() - size = bytes.limit() - diskStore.putBytes(blockId, bytes, putLevel) - } + bytes.rewind() + val size = bytes.limit() + + try { + if (level.useMemory) { + // Put it in memory first, even if it also has useDisk set to true; + // We will drop it to disk later if the memory store can't hold it. + val putSucceeded = if (level.deserialized) { + val values = dataDeserialize(blockId, bytes.duplicate()) + memoryStore.putIterator(blockId, values, level).isRight } else { - assert(putLevel == StorageLevel.NONE) - throw new BlockException( - blockId, s"Attempted to put block $blockId without specifying storage level!") + memoryStore.putBytes(blockId, size, () => bytes) + } + if (!putSucceeded && level.useDisk) { + logWarning(s"Persisting block $blockId to disk instead.") + diskStore.putBytes(blockId, bytes) } + } else if (level.useDisk) { + diskStore.putBytes(blockId, bytes) + } - val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) - blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid - if (blockWasSuccessfullyStored) { - // Now that the block is in either the memory, externalBlockStore, or disk store, - // let other threads read it, and tell the master about it. - putBlockInfo.size = size - if (tellMaster) { - reportBlockStatus(blockId, putBlockInfo, putBlockStatus) - } - Option(TaskContext.get()).foreach { c => - c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus))) - } + val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) + blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid + if (blockWasSuccessfullyStored) { + // Now that the block is in either the memory, externalBlockStore, or disk store, + // tell the master about it. + putBlockInfo.size = size + if (tellMaster) { + reportBlockStatus(blockId, putBlockInfo, putBlockStatus) } - } finally { - if (blockWasSuccessfullyStored) { - if (keepReadLock) { - blockInfoManager.downgradeLock(blockId) - } else { - blockInfoManager.unlock(blockId) - } + Option(TaskContext.get()).foreach { c => + c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus))) + } + } + } finally { + if (blockWasSuccessfullyStored) { + if (keepReadLock) { + blockInfoManager.downgradeLock(blockId) } else { - blockInfoManager.removeBlock(blockId) - logWarning(s"Putting block $blockId failed") + blockInfoManager.unlock(blockId) } + } else { + blockInfoManager.removeBlock(blockId) + logWarning(s"Putting block $blockId failed") } } logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) - if (replicationFuture != null) { + if (level.replication > 1) { // Wait for asynchronous replication to finish Await.ready(replicationFuture, Duration.Inf) - } else if (putLevel.replication > 1 && blockWasSuccessfullyStored) { - val remoteStartTime = System.currentTimeMillis - val bytesToReplicate: ByteBuffer = { - doGetLocal(blockId, putBlockInfo, asBlockResult = false) - .map(_.asInstanceOf[ByteBuffer]) - .getOrElse { - throw new SparkException(s"Block $blockId was not found even though it was just stored") - } + logDebug("Putting block %s with replication took %s" + .format(blockId, Utils.getUsedTimeMs(startTimeMs))) + } else { + logDebug("Putting block %s without replication took %s" + .format(blockId, Utils.getUsedTimeMs(startTimeMs))) + } + + blockWasSuccessfullyStored + } + + /** + * Put the given block according to the given level in one of the block stores, replicating + * the values if necessary. + * + * If the block already exists, this method will not overwrite it. + * + * @param keepReadLock if true, this method will hold the read lock when it returns (even if the + * block already exists). If false, this method will hold no locks when it + * returns. + * @return None if the block was already present or if the put succeeded, or Some(iterator) + * if the put failed. + */ + private def doPutIterator( + blockId: BlockId, + iterator: () => Iterator[Any], + level: StorageLevel, + tellMaster: Boolean = true, + keepReadLock: Boolean = false): Option[Iterator[Any]] = { + + require(blockId != null, "BlockId is null") + require(level != null && level.isValid, "StorageLevel is null or invalid") + + /* Remember the block's storage level so that we can correctly drop it to disk if it needs + * to be dropped right after it got put into memory. Note, however, that other threads will + * not be able to get() this block until we call markReady on its BlockInfo. */ + val putBlockInfo = { + val newInfo = new BlockInfo(level, tellMaster) + if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) { + newInfo + } else { + logWarning(s"Block $blockId already exists on this machine; not re-adding it") + if (!keepReadLock) { + // lockNewBlockForWriting returned a read lock on the existing block, so we must free it: + releaseLock(blockId) + } + return None + } + } + + val startTimeMs = System.currentTimeMillis + + // Size of the block in bytes + var size = 0L + + var blockWasSuccessfullyStored = false + var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None + + try { + if (level.useMemory) { + // Put it in memory first, even if it also has useDisk set to true; + // We will drop it to disk later if the memory store can't hold it. + memoryStore.putIterator(blockId, iterator(), level) match { + case Right(s) => + size = s + case Left(iter) => + // Not enough space to unroll this block; drop to disk if applicable + if (level.useDisk) { + logWarning(s"Persisting block $blockId to disk instead.") + diskStore.put(blockId) { fileOutputStream => + dataSerializeStream(blockId, fileOutputStream, iter) --- End diff -- Nope, the entire block is now on disk because `iter` contains all of the elements of the original input `iterator()`.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org