[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19285 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163819767 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -702,6 +634,93 @@ private[spark] class MemoryStore( } } +private trait ValuesBuilder[T] { + def preciseSize: Long --- End diff -- good idea --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163807859 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -702,6 +634,93 @@ private[spark] class MemoryStore( } } +private trait ValuesBuilder[T] { + def preciseSize: Long --- End diff -- Hey guys, why not name the trait as `MemoryEntryBuilder`? As I see from the code, it is used to build the `MemoryEntry`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163768689 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -232,78 +236,93 @@ private[spark] class MemoryStore( elementsUnrolled += 1 } +val valuesBuilder = if (keepUnrolling) { + Some(valuesHolder.getBuilder()) +} else { + None +} + +// Make sure that we have enough memory to store the block. By this point, it is possible that +// the block's actual memory usage has exceeded the unroll memory by a small amount, so we +// perform one final call to attempt to allocate additional memory if necessary. if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size - def transferUnrollToStorage(amount: Long): Unit = { -// Synchronize so that transfer is atomic -memoryManager.synchronized { - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount) - val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) - assert(success, "transferring unroll memory to storage memory failed") + val size = valuesBuilder.get.preciseSize + if (size > unrollMemoryUsedByThisBlock) { +val amountToRequest = size - unrollMemoryUsedByThisBlock +keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) +if (keepUnrolling) { + unrollMemoryUsedByThisBlock += amountToRequest } } - // Acquire storage memory if necessary to store this block in memory. - val enoughStorageMemory = { -if (unrollMemoryUsedByThisBlock <= size) { - val acquiredExtra = -memoryManager.acquireStorageMemory( - blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP) - if (acquiredExtra) { -transferUnrollToStorage(unrollMemoryUsedByThisBlock) - } - acquiredExtra -} else { // unrollMemoryUsedByThisBlock > size - // If this task attempt already owns more unroll memory than is necessary to store the - // block, then release the extra memory that will not be used. - val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) - transferUnrollToStorage(size) - true -} +} + +if (keepUnrolling) { --- End diff -- updated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163750079 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -702,6 +641,87 @@ private[spark] class MemoryStore( } } +private trait ValuesBuilder[T] { + def preciseSize: Long + def build(): MemoryEntry[T] +} + +private trait ValuesHolder[T] { + def storeValue(value: T): Unit + def estimatedSize(): Long + def getBuilder(): ValuesBuilder[T] --- End diff -- add a comment to say that, after `getBuilder` is called, this `ValuesHolder` becomes invalid. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163749987 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -702,6 +641,87 @@ private[spark] class MemoryStore( } } +private trait ValuesBuilder[T] { + def preciseSize: Long + def build(): MemoryEntry[T] +} + +private trait ValuesHolder[T] { + def storeValue(value: T): Unit + def estimatedSize(): Long + def getBuilder(): ValuesBuilder[T] +} + +/** + * A holder for storing the deserialized values. + */ +private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] { + // Underlying vector for unrolling the block + var vector = new SizeTrackingVector[T]()(classTag) + var arrayValues: Array[T] = null + + override def storeValue(value: T): Unit = { +vector += value + } + + override def estimatedSize(): Long = { +vector.estimateSize() + } + + override def getBuilder(): ValuesBuilder[T] = new ValuesBuilder[T] { +// We successfully unrolled the entirety of this block +arrayValues = vector.toArray +vector = null + +override val preciseSize: Long = SizeEstimator.estimate(arrayValues) + +override def build(): MemoryEntry[T] = + DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) + } +} + +/** + * A holder for storing the serialized values. + */ +private class SerializedValuesHolder[T]( +blockId: BlockId, +chunkSize: Int, +classTag: ClassTag[T], +memoryMode: MemoryMode, +serializerManager: SerializerManager) extends ValuesHolder[T] { + val allocator = memoryMode match { +case MemoryMode.ON_HEAP => ByteBuffer.allocate _ +case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ + } + + val redirectableStream = new RedirectableOutputStream + val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator) + redirectableStream.setOutputStream(bbos) + val serializationStream: SerializationStream = { +val autoPick = !blockId.isInstanceOf[StreamBlockId] +val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() +ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) + } + + override def storeValue(value: T): Unit = { +serializationStream.writeObject(value)(classTag) + } + + override def estimatedSize(): Long = { +bbos.size + } + + override def getBuilder(): ValuesBuilder[T] = new ValuesBuilder[T] { +// We successfully unrolled the entirety of this block +serializationStream.close() + +override val preciseSize: Long = bbos.size --- End diff -- this can be a `def`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163749065 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -232,78 +236,93 @@ private[spark] class MemoryStore( elementsUnrolled += 1 } +val valuesBuilder = if (keepUnrolling) { + Some(valuesHolder.getBuilder()) +} else { + None +} + +// Make sure that we have enough memory to store the block. By this point, it is possible that +// the block's actual memory usage has exceeded the unroll memory by a small amount, so we +// perform one final call to attempt to allocate additional memory if necessary. if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size - def transferUnrollToStorage(amount: Long): Unit = { -// Synchronize so that transfer is atomic -memoryManager.synchronized { - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount) - val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) - assert(success, "transferring unroll memory to storage memory failed") + val size = valuesBuilder.get.preciseSize + if (size > unrollMemoryUsedByThisBlock) { +val amountToRequest = size - unrollMemoryUsedByThisBlock +keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) +if (keepUnrolling) { + unrollMemoryUsedByThisBlock += amountToRequest } } - // Acquire storage memory if necessary to store this block in memory. - val enoughStorageMemory = { -if (unrollMemoryUsedByThisBlock <= size) { - val acquiredExtra = -memoryManager.acquireStorageMemory( - blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP) - if (acquiredExtra) { -transferUnrollToStorage(unrollMemoryUsedByThisBlock) - } - acquiredExtra -} else { // unrollMemoryUsedByThisBlock > size - // If this task attempt already owns more unroll memory than is necessary to store the - // block, then release the extra memory that will not be used. - val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) - transferUnrollToStorage(size) - true -} +} + +if (keepUnrolling) { --- End diff -- a little improvement ``` if (keepUnrolling) { val builder = valuesHolder.getBuilder() ... if (keepUnrolling) { val entry = builder.build() ... Right(entry.size) } else { ... logUnrollFailureMessage(blockId, builder.preciseSize) Left(unrollMemoryUsedByThisBlock) } } else { ... logUnrollFailureMessage(blockId, valueHolder.estimatedSize) Left(unrollMemoryUsedByThisBlock) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163743072 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -346,85 +350,24 @@ private[spark] class MemoryStore( } else { initialMemoryThreshold.toInt } -val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator) -redirectableStream.setOutputStream(bbos) -val serializationStream: SerializationStream = { - val autoPick = !blockId.isInstanceOf[StreamBlockId] - val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() - ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) -} - -// Request enough memory to begin unrolling -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) - -if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + -s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") -} else { - unrollMemoryUsedByThisBlock += initialMemoryThreshold -} - -def reserveAdditionalMemoryIfNecessary(): Unit = { - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} - -// Unroll this block safely, checking whether we have exceeded our threshold -while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) - elementsUnrolled += 1 - if (elementsUnrolled % memoryCheckPeriod == 0) { -reserveAdditionalMemoryIfNecessary() - } -} -// Make sure that we have enough memory to store the block. By this point, it is possible that -// the block's actual memory usage has exceeded the unroll memory by a small amount, so we -// perform one final call to attempt to allocate additional memory if necessary. -if (keepUnrolling) { - serializationStream.close() - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} +val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag, + memoryMode, serializerManager) -if (keepUnrolling) { --- End diff -- Thanks for the detailed explanation. I have been updated, the code looks more clearly now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163556649 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -346,85 +350,24 @@ private[spark] class MemoryStore( } else { initialMemoryThreshold.toInt } -val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator) -redirectableStream.setOutputStream(bbos) -val serializationStream: SerializationStream = { - val autoPick = !blockId.isInstanceOf[StreamBlockId] - val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() - ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) -} - -// Request enough memory to begin unrolling -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) - -if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + -s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") -} else { - unrollMemoryUsedByThisBlock += initialMemoryThreshold -} - -def reserveAdditionalMemoryIfNecessary(): Unit = { - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} - -// Unroll this block safely, checking whether we have exceeded our threshold -while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) - elementsUnrolled += 1 - if (elementsUnrolled % memoryCheckPeriod == 0) { -reserveAdditionalMemoryIfNecessary() - } -} -// Make sure that we have enough memory to store the block. By this point, it is possible that -// the block's actual memory usage has exceeded the unroll memory by a small amount, so we -// perform one final call to attempt to allocate additional memory if necessary. -if (keepUnrolling) { - serializationStream.close() - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} +val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag, + memoryMode, serializerManager) -if (keepUnrolling) { --- End diff -- `putIteratorAsValues` and `putIteratorAsBytes` have different code structure for the last step. In the new `putIterator` method, you followed the code structure of `putIteratorAsValues`, is it better to follow the one from `putIteratorAsBytes`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163551992 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -346,85 +350,24 @@ private[spark] class MemoryStore( } else { initialMemoryThreshold.toInt } -val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator) -redirectableStream.setOutputStream(bbos) -val serializationStream: SerializationStream = { - val autoPick = !blockId.isInstanceOf[StreamBlockId] - val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() - ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) -} - -// Request enough memory to begin unrolling -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) - -if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + -s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") -} else { - unrollMemoryUsedByThisBlock += initialMemoryThreshold -} - -def reserveAdditionalMemoryIfNecessary(): Unit = { - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} - -// Unroll this block safely, checking whether we have exceeded our threshold -while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) - elementsUnrolled += 1 - if (elementsUnrolled % memoryCheckPeriod == 0) { -reserveAdditionalMemoryIfNecessary() - } -} -// Make sure that we have enough memory to store the block. By this point, it is possible that -// the block's actual memory usage has exceeded the unroll memory by a small amount, so we -// perform one final call to attempt to allocate additional memory if necessary. -if (keepUnrolling) { - serializationStream.close() - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} +val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag, + memoryMode, serializerManager) -if (keepUnrolling) { --- End diff -- I do not understand what you mean, could you explain it more? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163551817 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -702,6 +645,83 @@ private[spark] class MemoryStore( } } +private trait ValuesHolder[T] { + def storeValue(value: T): Unit + def estimatedSize(roughly: Boolean): Long --- End diff -- Very thanks, I'll update it tomorrow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163526188 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -702,6 +645,83 @@ private[spark] class MemoryStore( } } +private trait ValuesHolder[T] { + def storeValue(value: T): Unit + def estimatedSize(roughly: Boolean): Long --- End diff -- an example ``` class DeserializedValuesHolder extends ValuesHolder { ... def getBuilder = new ValuesBuilder { val valuesArray = vector.toArray def preciseSize = SizeEstimator.estimate(valuesArray) def buid = ... } } class SerializedValuesHolder extends ValuesHolder { ... def getBuilder = new ValuesBuilder { serializationStream.close() def preciseSize = bbos.size def build = ... } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163525215 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -702,6 +645,83 @@ private[spark] class MemoryStore( } } +private trait ValuesHolder[T] { + def storeValue(value: T): Unit + def estimatedSize(roughly: Boolean): Long --- End diff -- this is not a good API design, we can do ``` trait ValuesHolder { def putValue(value: T) def estimatedSize: Long def getBuilder(): ValuesBuilder } trait ValuesBuilder { def preciseSize: Long def build(): MemoryEntry } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163524751 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -346,85 +350,24 @@ private[spark] class MemoryStore( } else { initialMemoryThreshold.toInt } -val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator) -redirectableStream.setOutputStream(bbos) -val serializationStream: SerializationStream = { - val autoPick = !blockId.isInstanceOf[StreamBlockId] - val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() - ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) -} - -// Request enough memory to begin unrolling -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) - -if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + -s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") -} else { - unrollMemoryUsedByThisBlock += initialMemoryThreshold -} - -def reserveAdditionalMemoryIfNecessary(): Unit = { - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} - -// Unroll this block safely, checking whether we have exceeded our threshold -while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) - elementsUnrolled += 1 - if (elementsUnrolled % memoryCheckPeriod == 0) { -reserveAdditionalMemoryIfNecessary() - } -} -// Make sure that we have enough memory to store the block. By this point, it is possible that -// the block's actual memory usage has exceeded the unroll memory by a small amount, so we -// perform one final call to attempt to allocate additional memory if necessary. -if (keepUnrolling) { - serializationStream.close() - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} +val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag, + memoryMode, serializerManager) -if (keepUnrolling) { --- End diff -- is it better to use this code structure? ``` if (keepUnrolling) { // get precise size and reserve extra memory if needed } if (keepUnrolling) { // create the entry } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163462053 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -233,17 +235,13 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size + // We need more precise value + val size = valuesHolder.esitimatedSize(false) --- End diff -- I change the code back to originally. For `DeserializedValuesHolder`, we could `buildEntry` and get the `size` from `MemorySize`. But for `SerializedValuesHolder`, this way not work correctly. Because we need call the `bbos.toChunkedByteBuffer` to get the `MemoryEntry` object, and if the reserved memory is not enough for transfer the unroll memory to storage memory. Then we unroll failed and need call `bbos.toChunkedByteBuffer` ([L802](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala#L802), this should be intentional which related to #15043). So the problem is that we call `bbos.toChunkedByteBuffer` twice, but it can't be. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163455326 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -702,6 +645,76 @@ private[spark] class MemoryStore( } } +private trait ValuesHolder[T] { + def storeValue(value: T): Unit + def estimatedSize(): Long + def buildEntry(): MemoryEntry[T] +} + +/** + * A holder for storing the deserialized values. + */ +private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] { + // Underlying vector for unrolling the block + var vector = new SizeTrackingVector[T]()(classTag) + var arrayValues: Array[T] = null + var preciseSize: Long = -1 --- End diff -- it can be a local variable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163131741 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -162,26 +162,29 @@ private[spark] class MemoryStore( } /** - * Attempt to put the given block in memory store as values. + * Attempt to put the given block in memory store as values or bytes. * * It's possible that the iterator is too large to materialize and store in memory. To avoid * OOM exceptions, this method will gradually unroll the iterator while periodically checking * whether there is enough free memory. If the block is successfully materialized, then the * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. + * @param blockId The block id. + * @param values The values which need be stored. + * @param classTag the [[ClassTag]] for the block. + * @param memoryMode The values saved mode. --- End diff -- Here the `values` is the `* @param values The values which need be stored.`, it isn't the type of storage, I'll re-describe later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163131519 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -162,26 +162,29 @@ private[spark] class MemoryStore( } /** - * Attempt to put the given block in memory store as values. + * Attempt to put the given block in memory store as values or bytes. * * It's possible that the iterator is too large to materialize and store in memory. To avoid * OOM exceptions, this method will gradually unroll the iterator while periodically checking * whether there is enough free memory. If the block is successfully materialized, then the * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. + * @param blockId The block id. + * @param values The values which need be stored. + * @param classTag the [[ClassTag]] for the block. + * @param memoryMode The values saved mode. + * @param valuesHolder A holder that supports storing record of values into memory store as + *values or bytes. + * @return if the block is stored successfully, return the stored data size. Else return the + * memory has used for unroll the block. --- End diff -- I' ll update it more clearly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163131383 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -233,17 +235,13 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size + // We need more precise value + val size = valuesHolder.esitimatedSize(false) --- End diff -- > It seems we can just build the entry and call entry.size. It 's more reasonable, I'll update it later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162962203 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -233,17 +235,13 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size + // We need more precise value + val size = valuesHolder.esitimatedSize(false) --- End diff -- `roughly = false` tells more than estimating the size of vector, which is 'unroll has finished'. So, `storeValue` will not be called anymore. And for `heavy work`(under my understanding, mostly due to `SizeEstimator.estimate(arrayValues)`), yeah, I agree with you. Those `heavy work` would be done whether we create an entry or not or whether the entry put into the memory store or not finally. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162948453 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -233,17 +235,13 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size + // We need more precise value + val size = valuesHolder.esitimatedSize(false) --- End diff -- Looking at the implementations, I don't think this makes a lot of difference. `esitimatedSize(false)` already does some heavy work and makes this `ValuesHolder` unusable, i.e. you can't call `storeValue` anymore, which seems not something a size estimation should do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162931279 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -233,17 +235,13 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size + // We need more precise value + val size = valuesHolder.esitimatedSize(false) --- End diff -- It can avoid creating an useless entry if the condition `enoughStorageMemory == true` does not satisfy, I guess. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162927703 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -233,17 +235,13 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size + // We need more precise value + val size = valuesHolder.esitimatedSize(false) --- End diff -- why do we need `esitimatedSize(false)`? It seems we can just build the entry and call `entry.size`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162848405 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -162,26 +162,29 @@ private[spark] class MemoryStore( } /** - * Attempt to put the given block in memory store as values. + * Attempt to put the given block in memory store as values or bytes. * * It's possible that the iterator is too large to materialize and store in memory. To avoid * OOM exceptions, this method will gradually unroll the iterator while periodically checking * whether there is enough free memory. If the block is successfully materialized, then the * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. + * @param blockId The block id. + * @param values The values which need be stored. + * @param classTag the [[ClassTag]] for the block. + * @param memoryMode The values saved mode. + * @param valuesHolder A holder that supports storing record of values into memory store as + *values or bytes. + * @return if the block is stored successfully, return the stored data size. Else return the + * memory has used for unroll the block. --- End diff -- First, I think you will do not disagree with that there's partially-unrolled case exists in failure situation. Second, >The block can be unrolled fully, but the used memory exceeded the request and can't request the extra memory. Yeah, I know. But what I want to say is block unrolled fully doesn't mean we have reserved unroll memory for all values(this only happens when the last element in iterator % memoryCheckPeriod == 0), because of `memoryCheckPeriod`. And here, we talk about `the memory has used for unroll the block`. So, it is not accurately to say 'block be unrolled fully, so the used memory is for all the values'. So, mostly, it would be `partially-unrolled`. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162841437 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -162,26 +162,29 @@ private[spark] class MemoryStore( } /** - * Attempt to put the given block in memory store as values. + * Attempt to put the given block in memory store as values or bytes. * * It's possible that the iterator is too large to materialize and store in memory. To avoid * OOM exceptions, this method will gradually unroll the iterator while periodically checking * whether there is enough free memory. If the block is successfully materialized, then the * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. + * @param blockId The block id. + * @param values The values which need be stored. + * @param classTag the [[ClassTag]] for the block. + * @param memoryMode The values saved mode. --- End diff -- Ah, I mean put the `values` together with `saved mode` let me consider about values's store type, rather than memory mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162840896 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -162,26 +162,29 @@ private[spark] class MemoryStore( } /** - * Attempt to put the given block in memory store as values. + * Attempt to put the given block in memory store as values or bytes. * * It's possible that the iterator is too large to materialize and store in memory. To avoid * OOM exceptions, this method will gradually unroll the iterator while periodically checking * whether there is enough free memory. If the block is successfully materialized, then the * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. + * @param blockId The block id. + * @param values The values which need be stored. + * @param classTag the [[ClassTag]] for the block. + * @param memoryMode The values saved mode. + * @param valuesHolder A holder that supports storing record of values into memory store as + *values of bytes. + * @return if the block is stored successfully, return the stored data size. Else return the + * memory has used for unroll the block. --- End diff -- The block can be unrolled fully, but the used memory exceeded the request and can't request the extra memory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162840776 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -162,26 +162,29 @@ private[spark] class MemoryStore( } /** - * Attempt to put the given block in memory store as values. + * Attempt to put the given block in memory store as values or bytes. * * It's possible that the iterator is too large to materialize and store in memory. To avoid * OOM exceptions, this method will gradually unroll the iterator while periodically checking * whether there is enough free memory. If the block is successfully materialized, then the * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. + * @param blockId The block id. + * @param values The values which need be stored. + * @param classTag the [[ClassTag]] for the block. + * @param memoryMode The values saved mode. --- End diff -- `MemoryMode` only has ON_HEAP and OFF_HEAP two modes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162837169 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -346,85 +348,24 @@ private[spark] class MemoryStore( } else { initialMemoryThreshold.toInt } -val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator) -redirectableStream.setOutputStream(bbos) -val serializationStream: SerializationStream = { - val autoPick = !blockId.isInstanceOf[StreamBlockId] - val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() - ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) -} - -// Request enough memory to begin unrolling -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) - -if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + -s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") -} else { - unrollMemoryUsedByThisBlock += initialMemoryThreshold -} - -def reserveAdditionalMemoryIfNecessary(): Unit = { - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} - -// Unroll this block safely, checking whether we have exceeded our threshold -while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) - elementsUnrolled += 1 - if (elementsUnrolled % memoryCheckPeriod == 0) { -reserveAdditionalMemoryIfNecessary() - } -} -// Make sure that we have enough memory to store the block. By this point, it is possible that -// the block's actual memory usage has exceeded the unroll memory by a small amount, so we -// perform one final call to attempt to allocate additional memory if necessary. -if (keepUnrolling) { - serializationStream.close() - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} +val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag, + memoryMode, serializerManager) -if (keepUnrolling) { - val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) - // Synchronize so that transfer is atomic - memoryManager.synchronized { -releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock) -val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode) -assert(success, "transferring unroll memory to storage memory failed") - } - entries.synchronized { -entries.put(blockId, entry) - } - logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format( -blockId, Utils.bytesToString(entry.size), -Utils.bytesToString(maxMemory - blocksMemoryUsed))) - Right(entry.size) -} else { - // We ran out of space while unrolling the values for this block - logUnrollFailureMessage(blockId, bbos.size) - Left( -new PartiallySerializedBlock( +putIterator(blockId, values, classTag, memoryMode, valuesHolder) match { + case Right(storedSize) => Right(storedSize) + case Left(unrollMemoryUsedByThisBlock) => +// We ran out of space while unrolling the values for this block +logUnrollFailureMessage(blockId, unrollMemoryUsedByThisBlock) --- End diff -- The computed size so far(according to logUnrollFailureMessage's warning) is `bbos.size` rather than `unrollMemoryUsedByThisBlock`, which may less than `bbos.size`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162835021 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -162,26 +162,29 @@ private[spark] class MemoryStore( } /** - * Attempt to put the given block in memory store as values. + * Attempt to put the given block in memory store as values or bytes. * * It's possible that the iterator is too large to materialize and store in memory. To avoid * OOM exceptions, this method will gradually unroll the iterator while periodically checking * whether there is enough free memory. If the block is successfully materialized, then the * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. + * @param blockId The block id. + * @param values The values which need be stored. + * @param classTag the [[ClassTag]] for the block. + * @param memoryMode The values saved mode. + * @param valuesHolder A holder that supports storing record of values into memory store as + *values of bytes. + * @return if the block is stored successfully, return the stored data size. Else return the + * memory has used for unroll the block. --- End diff -- Maybe, *partially-unrolled*. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162834864 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -162,26 +162,29 @@ private[spark] class MemoryStore( } /** - * Attempt to put the given block in memory store as values. + * Attempt to put the given block in memory store as values or bytes. * * It's possible that the iterator is too large to materialize and store in memory. To avoid * OOM exceptions, this method will gradually unroll the iterator while periodically checking * whether there is enough free memory. If the block is successfully materialized, then the * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. + * @param blockId The block id. + * @param values The values which need be stored. + * @param classTag the [[ClassTag]] for the block. + * @param memoryMode The values saved mode. --- End diff -- If say *save mode*, I will consider whether it's bytes or values. I think ON_HEAP or OFF_HEAP will be ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162834903 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -162,26 +162,29 @@ private[spark] class MemoryStore( } /** - * Attempt to put the given block in memory store as values. + * Attempt to put the given block in memory store as values or bytes. * * It's possible that the iterator is too large to materialize and store in memory. To avoid * OOM exceptions, this method will gradually unroll the iterator while periodically checking * whether there is enough free memory. If the block is successfully materialized, then the * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. + * @param blockId The block id. + * @param values The values which need be stored. + * @param classTag the [[ClassTag]] for the block. + * @param memoryMode The values saved mode. + * @param valuesHolder A holder that supports storing record of values into memory store as + *values of bytes. --- End diff -- typo: of -> or --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162802949 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -261,37 +263,93 @@ private[spark] class MemoryStore( // If this task attempt already owns more unroll memory than is necessary to store the // block, then release the extra memory that will not be used. val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) + releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory) transferUnrollToStorage(size) true } } + if (enoughStorageMemory) { entries.synchronized { - entries.put(blockId, entry) + entries.put(blockId, createMemoryEntry()) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) Right(size) } else { assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock, "released too much unroll memory") +Left(unrollMemoryUsedByThisBlock) + } +} else { + Left(unrollMemoryUsedByThisBlock) +} + } + + /** + * Attempt to put the given block in memory store as values. + * + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. + * + * @return in case of success, the estimated size of the stored data. In case of failure, return + * an iterator containing the values of the block. The returned iterator will be backed + * by the combination of the partially-unrolled block and the remaining elements of the + * original input iterator. The caller must either fully consume this iterator or call + * `close()` on it in order to free the storage memory consumed by the partially-unrolled + * block. + */ + private[storage] def putIteratorAsValues[T]( + blockId: BlockId, + values: Iterator[T], + classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { + +// Underlying vector for unrolling the block +var vector = new SizeTrackingVector[T]()(classTag) +var arrayValues: Array[T] = null +var preciseSize: Long = -1 + +def storeValue(value: T): Unit = { + vector += value +} + +def estimateSize(precise: Boolean): Long = { + if (precise) { +// We only call need the precise size after all values unrolled. +arrayValues = vector.toArray +preciseSize = SizeEstimator.estimate(arrayValues) +preciseSize + } else { +vector.estimateSize() + } +} + +def createMemoryEntry(): MemoryEntry[T] = { + // We successfully unrolled the entirety of this block + DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) +} + +putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, storeValue, + estimateSize, createMemoryEntry) match { + case Right(storedSize) => Right(storedSize) + case Left(unrollMemoryUsedByThisBlock) => +// We ran out of space while unrolling the values for this block +val (unrolledIterator, size) = if (vector != null) { --- End diff -- updated, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162549759 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -261,37 +263,93 @@ private[spark] class MemoryStore( // If this task attempt already owns more unroll memory than is necessary to store the // block, then release the extra memory that will not be used. val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) + releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory) transferUnrollToStorage(size) true } } + if (enoughStorageMemory) { entries.synchronized { - entries.put(blockId, entry) + entries.put(blockId, createMemoryEntry()) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) Right(size) } else { assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock, "released too much unroll memory") +Left(unrollMemoryUsedByThisBlock) + } +} else { + Left(unrollMemoryUsedByThisBlock) +} + } + + /** + * Attempt to put the given block in memory store as values. + * + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. + * + * @return in case of success, the estimated size of the stored data. In case of failure, return + * an iterator containing the values of the block. The returned iterator will be backed + * by the combination of the partially-unrolled block and the remaining elements of the + * original input iterator. The caller must either fully consume this iterator or call + * `close()` on it in order to free the storage memory consumed by the partially-unrolled + * block. + */ + private[storage] def putIteratorAsValues[T]( + blockId: BlockId, + values: Iterator[T], + classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { + +// Underlying vector for unrolling the block +var vector = new SizeTrackingVector[T]()(classTag) +var arrayValues: Array[T] = null +var preciseSize: Long = -1 + +def storeValue(value: T): Unit = { + vector += value +} + +def estimateSize(precise: Boolean): Long = { + if (precise) { +// We only call need the precise size after all values unrolled. +arrayValues = vector.toArray +preciseSize = SizeEstimator.estimate(arrayValues) +preciseSize + } else { +vector.estimateSize() + } +} + +def createMemoryEntry(): MemoryEntry[T] = { + // We successfully unrolled the entirety of this block + DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) +} + +putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, storeValue, + estimateSize, createMemoryEntry) match { + case Right(storedSize) => Right(storedSize) + case Left(unrollMemoryUsedByThisBlock) => +// We ran out of space while unrolling the values for this block +val (unrolledIterator, size) = if (vector != null) { --- End diff -- Under what situation will vector be null ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162548350 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -162,26 +162,33 @@ private[spark] class MemoryStore( } /** - * Attempt to put the given block in memory store as values. + * Attempt to put the given block in memory store as values or bytes. * * It's possible that the iterator is too large to materialize and store in memory. To avoid * OOM exceptions, this method will gradually unroll the iterator while periodically checking * whether there is enough free memory. If the block is successfully materialized, then the * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. + * @param blockId The block id. + * @param values The values which need be stored. + * @param classTag the [[ClassTag]] for the block. + * @param memoryMode The values saved mode. + * @param storeValue Store the record of values to the MemoryStore. + * @param estimateSize Get the memory size which used to unroll the block. The parameters + * determine whether we need precise size. + * @param createMemoryEntry Using [[MemoryEntry]] to hold the stored values or bytes. + * @return if the block is stored successfully, return the stored data size. Else return the + * memory has used for unroll the block. */ - private[storage] def putIteratorAsValues[T]( + private def putIterator[T]( blockId: BlockId, values: Iterator[T], - classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { - + classTag: ClassTag[T], + memoryMode: MemoryMode, + storeValue: T => Unit, + estimateSize: Boolean => Long, + createMemoryEntry: () => MemoryEntry[T]): Either[Long, Long] = { --- End diff -- trait? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162548052 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -233,17 +235,13 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size + // get the precise size + val size = estimateSize(true) --- End diff -- It seems deserialized values do not have a **precise** size, even for `SizeEstimator.estimate(arrayValues)`. This would be confused. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162534339 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -261,37 +263,93 @@ private[spark] class MemoryStore( // If this task attempt already owns more unroll memory than is necessary to store the // block, then release the extra memory that will not be used. val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) + releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory) transferUnrollToStorage(size) true } } + if (enoughStorageMemory) { entries.synchronized { - entries.put(blockId, entry) + entries.put(blockId, createMemoryEntry()) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) Right(size) } else { assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock, "released too much unroll memory") +Left(unrollMemoryUsedByThisBlock) + } +} else { + Left(unrollMemoryUsedByThisBlock) +} + } + + /** + * Attempt to put the given block in memory store as values. + * + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. --- End diff -- let's not duplicated this document --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162534289 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -162,26 +162,33 @@ private[spark] class MemoryStore( } /** - * Attempt to put the given block in memory store as values. + * Attempt to put the given block in memory store as values or bytes. * * It's possible that the iterator is too large to materialize and store in memory. To avoid * OOM exceptions, this method will gradually unroll the iterator while periodically checking * whether there is enough free memory. If the block is successfully materialized, then the * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. + * @param blockId The block id. + * @param values The values which need be stored. + * @param classTag the [[ClassTag]] for the block. + * @param memoryMode The values saved mode. + * @param storeValue Store the record of values to the MemoryStore. + * @param estimateSize Get the memory size which used to unroll the block. The parameters + * determine whether we need precise size. + * @param createMemoryEntry Using [[MemoryEntry]] to hold the stored values or bytes. + * @return if the block is stored successfully, return the stored data size. Else return the + * memory has used for unroll the block. */ - private[storage] def putIteratorAsValues[T]( + private def putIterator[T]( blockId: BlockId, values: Iterator[T], - classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { - + classTag: ClassTag[T], + memoryMode: MemoryMode, + storeValue: T => Unit, + estimateSize: Boolean => Long, + createMemoryEntry: () => MemoryEntry[T]): Either[Long, Long] = { --- End diff -- instead of passing 3 functions, I'd like to introduce ``` class ValuesHolder { def store(value) def esitimatedSize() def build(): MemoryEntry } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r149379715 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -261,37 +259,97 @@ private[spark] class MemoryStore( // If this task attempt already owns more unroll memory than is necessary to store the // block, then release the extra memory that will not be used. val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) + releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory) transferUnrollToStorage(size) true } } + if (enoughStorageMemory) { entries.synchronized { - entries.put(blockId, entry) + entries.put(blockId, createMemoryEntry()) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) Right(size) } else { assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock, "released too much unroll memory") +Left(unrollMemoryUsedByThisBlock) + } +} else { + Left(unrollMemoryUsedByThisBlock) +} + } + + /** + * Attempt to put the given block in memory store as values. + * + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. + * + * @return in case of success, the estimated size of the stored data. In case of failure, return + * an iterator containing the values of the block. The returned iterator will be backed + * by the combination of the partially-unrolled block and the remaining elements of the + * original input iterator. The caller must either fully consume this iterator or call + * `close()` on it in order to free the storage memory consumed by the partially-unrolled + * block. + */ + private[storage] def putIteratorAsValues[T]( + blockId: BlockId, + values: Iterator[T], + classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { + +// Underlying vector for unrolling the block +var vector = new SizeTrackingVector[T]()(classTag) +var arrayValues: Array[T] = null +var preciseSize: Long = -1 + +def storeValue(value: T): Unit = { + vector += value +} + +def estimateSize(precise: Boolean): Long = { + if (precise) { +// We only call need the precise size after all values unrolled. +arrayValues = vector.toArray +preciseSize = SizeEstimator.estimate(arrayValues) +vector = null +preciseSize + } else { +vector.estimateSize() + } +} + +def createMemoryEntry(): MemoryEntry[T] = { + // We successfully unrolled the entirety of this block + assert(arrayValues != null, "arrayValue shouldn't be null!") + assert(preciseSize != -1, "preciseSize shouldn't be -1") --- End diff -- Under which condition would `preciseSize` be `-1`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r149374760 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -162,26 +162,29 @@ private[spark] class MemoryStore( } /** - * Attempt to put the given block in memory store as values. + * Attempt to put the given block in memory store as values or bytes. * * It's possible that the iterator is too large to materialize and store in memory. To avoid * OOM exceptions, this method will gradually unroll the iterator while periodically checking * whether there is enough free memory. If the block is successfully materialized, then the * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. + * @param memoryMode The values saved mode. --- End diff -- nit: also add param description for `blockId`ã `values` and `classTag`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r149379088 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -261,37 +259,97 @@ private[spark] class MemoryStore( // If this task attempt already owns more unroll memory than is necessary to store the // block, then release the extra memory that will not be used. val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) + releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory) transferUnrollToStorage(size) true } } + if (enoughStorageMemory) { entries.synchronized { - entries.put(blockId, entry) + entries.put(blockId, createMemoryEntry()) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) Right(size) } else { assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock, "released too much unroll memory") +Left(unrollMemoryUsedByThisBlock) + } +} else { + Left(unrollMemoryUsedByThisBlock) +} + } + + /** + * Attempt to put the given block in memory store as values. + * + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. + * + * @return in case of success, the estimated size of the stored data. In case of failure, return + * an iterator containing the values of the block. The returned iterator will be backed + * by the combination of the partially-unrolled block and the remaining elements of the + * original input iterator. The caller must either fully consume this iterator or call + * `close()` on it in order to free the storage memory consumed by the partially-unrolled + * block. + */ + private[storage] def putIteratorAsValues[T]( + blockId: BlockId, + values: Iterator[T], + classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { + +// Underlying vector for unrolling the block +var vector = new SizeTrackingVector[T]()(classTag) +var arrayValues: Array[T] = null +var preciseSize: Long = -1 + +def storeValue(value: T): Unit = { + vector += value +} + +def estimateSize(precise: Boolean): Long = { + if (precise) { +// We only call need the precise size after all values unrolled. +arrayValues = vector.toArray +preciseSize = SizeEstimator.estimate(arrayValues) +vector = null --- End diff -- It looks scary to put vector to null in the function `estimateSize`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r149379817 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -261,37 +259,97 @@ private[spark] class MemoryStore( // If this task attempt already owns more unroll memory than is necessary to store the // block, then release the extra memory that will not be used. val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) + releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory) transferUnrollToStorage(size) true } } + if (enoughStorageMemory) { entries.synchronized { - entries.put(blockId, entry) + entries.put(blockId, createMemoryEntry()) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) Right(size) } else { assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock, "released too much unroll memory") +Left(unrollMemoryUsedByThisBlock) + } +} else { + Left(unrollMemoryUsedByThisBlock) +} + } + + /** + * Attempt to put the given block in memory store as values. + * + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. + * + * @return in case of success, the estimated size of the stored data. In case of failure, return + * an iterator containing the values of the block. The returned iterator will be backed + * by the combination of the partially-unrolled block and the remaining elements of the + * original input iterator. The caller must either fully consume this iterator or call + * `close()` on it in order to free the storage memory consumed by the partially-unrolled + * block. + */ + private[storage] def putIteratorAsValues[T]( + blockId: BlockId, + values: Iterator[T], + classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { + +// Underlying vector for unrolling the block +var vector = new SizeTrackingVector[T]()(classTag) +var arrayValues: Array[T] = null +var preciseSize: Long = -1 + +def storeValue(value: T): Unit = { + vector += value +} + +def estimateSize(precise: Boolean): Long = { + if (precise) { +// We only call need the precise size after all values unrolled. +arrayValues = vector.toArray +preciseSize = SizeEstimator.estimate(arrayValues) +vector = null +preciseSize + } else { +vector.estimateSize() + } +} + +def createMemoryEntry(): MemoryEntry[T] = { + // We successfully unrolled the entirety of this block + assert(arrayValues != null, "arrayValue shouldn't be null!") + assert(preciseSize != -1, "preciseSize shouldn't be -1") + val entry = new DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) --- End diff -- Why do we need to create the val `entry`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r140490824 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -354,63 +401,30 @@ private[spark] class MemoryStore( ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } -// Request enough memory to begin unrolling -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) - -if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + -s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") -} else { - unrollMemoryUsedByThisBlock += initialMemoryThreshold +def storeValue(value: T): Unit = { + serializationStream.writeObject(value)(classTag) } -def reserveAdditionalMemoryIfNecessary(): Unit = { - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} - -// Unroll this block safely, checking whether we have exceeded our threshold -while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) - elementsUnrolled += 1 - if (elementsUnrolled % memoryCheckPeriod == 0) { -reserveAdditionalMemoryIfNecessary() +def estimateSize(precise: Boolean): Long = { + if (precise) { +serializationStream.flush() --- End diff -- @cloud-fan Sorry for the previous saying, I read the code again. Here seems call `serializationStream .close` is also OK. Because the the iterator is has not value need write, that's meaning the `serializationStream` don't need anymore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r140149449 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -354,63 +401,30 @@ private[spark] class MemoryStore( ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } -// Request enough memory to begin unrolling -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) - -if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + -s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") -} else { - unrollMemoryUsedByThisBlock += initialMemoryThreshold +def storeValue(value: T): Unit = { + serializationStream.writeObject(value)(classTag) } -def reserveAdditionalMemoryIfNecessary(): Unit = { - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} - -// Unroll this block safely, checking whether we have exceeded our threshold -while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) - elementsUnrolled += 1 - if (elementsUnrolled % memoryCheckPeriod == 0) { -reserveAdditionalMemoryIfNecessary() +def estimateSize(precise: Boolean): Long = { + if (precise) { +serializationStream.flush() --- End diff -- OK, I'll do it tomorrow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r140141918 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -354,63 +401,30 @@ private[spark] class MemoryStore( ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } -// Request enough memory to begin unrolling -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) - -if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + -s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") -} else { - unrollMemoryUsedByThisBlock += initialMemoryThreshold +def storeValue(value: T): Unit = { + serializationStream.writeObject(value)(classTag) } -def reserveAdditionalMemoryIfNecessary(): Unit = { - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} - -// Unroll this block safely, checking whether we have exceeded our threshold -while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) - elementsUnrolled += 1 - if (elementsUnrolled % memoryCheckPeriod == 0) { -reserveAdditionalMemoryIfNecessary() +def estimateSize(precise: Boolean): Long = { + if (precise) { +serializationStream.flush() --- End diff -- can you send a PR to fix this issue for `putIteratorAsBytes` first? It will make this PR easier to review --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r140126755 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -354,63 +401,30 @@ private[spark] class MemoryStore( ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } -// Request enough memory to begin unrolling -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) - -if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + -s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") -} else { - unrollMemoryUsedByThisBlock += initialMemoryThreshold +def storeValue(value: T): Unit = { + serializationStream.writeObject(value)(classTag) } -def reserveAdditionalMemoryIfNecessary(): Unit = { - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} - -// Unroll this block safely, checking whether we have exceeded our threshold -while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) - elementsUnrolled += 1 - if (elementsUnrolled % memoryCheckPeriod == 0) { -reserveAdditionalMemoryIfNecessary() +def estimateSize(precise: Boolean): Long = { + if (precise) { +serializationStream.flush() --- End diff -- Because there are some data cached in the serializationStream, we can't get the precise size if don't call `flush`. Previous we don't check again after unrolled the block, and it directly call the `serializationStream.close()`. But here we maybe need the `serializationStream` again if we can't get anther unroll memory, so we only should call `flush`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r139985550 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -354,63 +401,30 @@ private[spark] class MemoryStore( ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } -// Request enough memory to begin unrolling -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) - -if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + -s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") -} else { - unrollMemoryUsedByThisBlock += initialMemoryThreshold +def storeValue(value: T): Unit = { + serializationStream.writeObject(value)(classTag) } -def reserveAdditionalMemoryIfNecessary(): Unit = { - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} - -// Unroll this block safely, checking whether we have exceeded our threshold -while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) - elementsUnrolled += 1 - if (elementsUnrolled % memoryCheckPeriod == 0) { -reserveAdditionalMemoryIfNecessary() +def estimateSize(precise: Boolean): Long = { + if (precise) { +serializationStream.flush() --- End diff -- I don't see anywhere in the previous code call `flush`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r139938279 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -233,17 +235,13 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size + // get the precise size + val size = estimateSize(true) --- End diff -- Previously, the `putIteratorAsValues ` seems no problem. But the `putIteratorAsBytes ` doesn't check again after unrolled the iterator. Now the `putIterator` is copied form previous `putIteratorAsValues `. For `SizeTrackingVector`, we could call `arrayValues.toIterator` to get a iterator again after call `SizeTrackingVector.toArray`. But for `ChunkedByteBufferOutputStream`, we can't back to `stream` after called `ChunkedByteBufferOutputStream.toChunkedByteBuffer` (the `PartiallySerializedBlock` need a stream). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r139929616 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -233,17 +235,13 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size + // get the precise size + val size = estimateSize(true) --- End diff -- But the previous code just calls `entry.size`, are you fixing a new bug? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r139920393 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -233,17 +235,13 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size + // get the precise size + val size = estimateSize(true) --- End diff -- We just unrolled the iterator successfully until here. But the size of underlying vector maybe greater than the `unrollMemoryUsedByThisBlock` which we requested memory for unroll the block. So we need check it again and determine whether we need request more memory. And we only should call `bbos.toChunkedByteBuffer` or `vector.toArray` after we requested enough memory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r139917367 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -233,17 +235,13 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size + // get the precise size + val size = estimateSize(true) --- End diff -- This is just we unrolled the iterator successfully. But maybe the size of underlying vector is greater than `unrollMemoryUsedByThisBlock `, so we need request more memory. In this time, there is a possible that we can't request enough memory again, so we should call `bbos.toChunkedByteBuffer` or `vector.toArray` after requested enough memory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r139917831 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -252,7 +250,7 @@ private[spark] class MemoryStore( if (unrollMemoryUsedByThisBlock <= size) { --- End diff -- Here the size of underlying vector or bytebuffer maybe greater than the `unrollMemoryUsedByThisBlock `. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r139897940 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -233,17 +235,13 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size + // get the precise size + val size = estimateSize(true) --- End diff -- Why we need `estimateSize(true)`? Is this just creating the entry and getting `entry.size` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
GitHub user ConeyLiu opened a pull request: https://github.com/apache/spark/pull/19285 [SPARK-22068][CORE]Reduce the duplicate code between putIteratorAsValues and putIteratorAsBytes ## What changes were proposed in this pull request? The code logic between `MemoryStore.putIteratorAsValues` and `Memory.putIteratorAsBytes` are almost same, so we should reduce the duplicate code between them. ## How was this patch tested? Existing UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ConeyLiu/spark rmemorystore Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19285.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19285 commit 2c20dcbcf499aee5d6fbbb80f4803b3cad37c17c Author: Xianyang LiuDate: 2017-09-17T09:53:49Z refactor memorystore commit 120564303641a92d32ec434dba5076771f6d6e80 Author: Xianyang Liu Date: 2017-09-19T08:47:24Z fix conflicts commit 92e1d51b18a810307a0b6d0cb761925a0429ead2 Author: Xianyang Liu Date: 2017-09-19T23:45:17Z fix bug and add some comments commit 6e2e29be7ad9d4bf3aae2d55fb4bf93c3286009b Author: Xianyang Liu Date: 2017-09-20T00:28:35Z better variable name --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org