Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11534#discussion_r55473498 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -789,117 +752,193 @@ private[spark] class BlockManager( // lockNewBlockForWriting returned a read lock on the existing block, so we must free it: releaseLock(blockId) } - return DoPutSucceeded + return true } } val startTimeMs = System.currentTimeMillis - // Size of the block in bytes - var size = 0L - - // The level we actually use to put the block - val putLevel = effectiveStorageLevel.getOrElse(level) - - // If we're storing bytes, then initiate the replication before storing them locally. + // Since we're storing bytes, initiate the replication before storing them locally. // This is faster as data is already serialized and ready to send. - val replicationFuture = data match { - case b: ByteBufferValues if putLevel.replication > 1 => - // Duplicate doesn't copy the bytes, but just creates a wrapper - val bufferView = b.buffer.duplicate() - Future { - // This is a blocking action and should run in futureExecutionContext which is a cached - // thread pool - replicate(blockId, bufferView, putLevel) - }(futureExecutionContext) - case _ => null + val replicationFuture = if (level.replication > 1) { + // Duplicate doesn't copy the bytes, but just creates a wrapper + val bufferView = bytes.duplicate() + Future { + // This is a blocking action and should run in futureExecutionContext which is a cached + // thread pool + replicate(blockId, bufferView, level) + }(futureExecutionContext) + } else { + null } var blockWasSuccessfullyStored = false - var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None - - putBlockInfo.synchronized { - logTrace("Put for block %s took %s to get into synchronized block" - .format(blockId, Utils.getUsedTimeMs(startTimeMs))) - try { - if (putLevel.useMemory) { - // Put it in memory first, even if it also has useDisk set to true; - // We will drop it to disk later if the memory store can't hold it. - data match { - case IteratorValues(iterator) => - memoryStore.putIterator(blockId, iterator(), putLevel) match { - case Right(s) => - size = s - case Left(iter) => - iteratorFromFailedMemoryStorePut = Some(iter) - } - case ByteBufferValues(bytes) => - bytes.rewind() - size = bytes.limit() - memoryStore.putBytes(blockId, bytes, putLevel) - } - } else if (putLevel.useDisk) { - data match { - case IteratorValues(iterator) => - diskStore.putIterator(blockId, iterator(), putLevel) match { - case Right(s) => - size = s - // putIterator() will never return Left (see its return type). - } - case ByteBufferValues(bytes) => - bytes.rewind() - size = bytes.limit() - diskStore.putBytes(blockId, bytes, putLevel) - } + bytes.rewind() + val size = bytes.limit() + + try { + if (level.useMemory) { + // Put it in memory first, even if it also has useDisk set to true; + // We will drop it to disk later if the memory store can't hold it. + val putSucceeded = if (level.deserialized) { + val values = dataDeserialize(blockId, bytes.duplicate()) + memoryStore.putIterator(blockId, values, level).isRight } else { - assert(putLevel == StorageLevel.NONE) - throw new BlockException( - blockId, s"Attempted to put block $blockId without specifying storage level!") + memoryStore.putBytes(blockId, size, () => bytes) + } + if (!putSucceeded && level.useDisk) { + logWarning(s"Persisting block $blockId to disk instead.") + diskStore.putBytes(blockId, bytes) } + } else if (level.useDisk) { + diskStore.putBytes(blockId, bytes) + } - val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) - blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid - if (blockWasSuccessfullyStored) { - // Now that the block is in either the memory, externalBlockStore, or disk store, - // let other threads read it, and tell the master about it. - putBlockInfo.size = size - if (tellMaster) { - reportBlockStatus(blockId, putBlockInfo, putBlockStatus) - } - Option(TaskContext.get()).foreach { c => - c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus))) - } + val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) + blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid + if (blockWasSuccessfullyStored) { + // Now that the block is in either the memory, externalBlockStore, or disk store, + // tell the master about it. + putBlockInfo.size = size + if (tellMaster) { + reportBlockStatus(blockId, putBlockInfo, putBlockStatus) } - } finally { - if (blockWasSuccessfullyStored) { - if (keepReadLock) { - blockInfoManager.downgradeLock(blockId) - } else { - blockInfoManager.unlock(blockId) - } + Option(TaskContext.get()).foreach { c => + c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus))) + } + } + } finally { + if (blockWasSuccessfullyStored) { + if (keepReadLock) { + blockInfoManager.downgradeLock(blockId) } else { - blockInfoManager.removeBlock(blockId) - logWarning(s"Putting block $blockId failed") + blockInfoManager.unlock(blockId) } + } else { + blockInfoManager.removeBlock(blockId) + logWarning(s"Putting block $blockId failed") } } logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) - if (replicationFuture != null) { + if (level.replication > 1) { // Wait for asynchronous replication to finish Await.ready(replicationFuture, Duration.Inf) - } else if (putLevel.replication > 1 && blockWasSuccessfullyStored) { - val remoteStartTime = System.currentTimeMillis - val bytesToReplicate: ByteBuffer = { - doGetLocal(blockId, putBlockInfo, asBlockResult = false) - .map(_.asInstanceOf[ByteBuffer]) - .getOrElse { - throw new SparkException(s"Block $blockId was not found even though it was just stored") - } + logDebug("Putting block %s with replication took %s" + .format(blockId, Utils.getUsedTimeMs(startTimeMs))) + } else { + logDebug("Putting block %s without replication took %s" + .format(blockId, Utils.getUsedTimeMs(startTimeMs))) + } + + blockWasSuccessfullyStored + } + + /** + * Put the given block according to the given level in one of the block stores, replicating + * the values if necessary. + * + * If the block already exists, this method will not overwrite it. + * + * @param keepReadLock if true, this method will hold the read lock when it returns (even if the + * block already exists). If false, this method will hold no locks when it + * returns. + * @return None if the block was already present or if the put succeeded, or Some(iterator) --- End diff -- Can you comment on any properties of the returned iterator in this case? Just so I understand, if this returns None, we've walked the input iterator to the end. If it returns an iterator, it has been advanced arbitrarily far along?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org