[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19285


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163819767
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -702,6 +634,93 @@ private[spark] class MemoryStore(
   }
 }
 
+private trait ValuesBuilder[T] {
+  def preciseSize: Long
--- End diff --

good idea


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-25 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163807859
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -702,6 +634,93 @@ private[spark] class MemoryStore(
   }
 }
 
+private trait ValuesBuilder[T] {
+  def preciseSize: Long
--- End diff --

Hey guys, why not name the trait as `MemoryEntryBuilder`?  As I see from 
the code, it is used to build the `MemoryEntry`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-24 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163768689
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -232,78 +236,93 @@ private[spark] class MemoryStore(
   elementsUnrolled += 1
 }
 
+val valuesBuilder = if (keepUnrolling) {
+  Some(valuesHolder.getBuilder())
+} else {
+  None
+}
+
+// Make sure that we have enough memory to store the block. By this 
point, it is possible that
+// the block's actual memory usage has exceeded the unroll memory by a 
small amount, so we
+// perform one final call to attempt to allocate additional memory if 
necessary.
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
-  def transferUnrollToStorage(amount: Long): Unit = {
-// Synchronize so that transfer is atomic
-memoryManager.synchronized {
-  releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
-  val success = memoryManager.acquireStorageMemory(blockId, 
amount, MemoryMode.ON_HEAP)
-  assert(success, "transferring unroll memory to storage memory 
failed")
+  val size = valuesBuilder.get.preciseSize
+  if (size > unrollMemoryUsedByThisBlock) {
+val amountToRequest = size - unrollMemoryUsedByThisBlock
+keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
+if (keepUnrolling) {
+  unrollMemoryUsedByThisBlock += amountToRequest
 }
   }
-  // Acquire storage memory if necessary to store this block in memory.
-  val enoughStorageMemory = {
-if (unrollMemoryUsedByThisBlock <= size) {
-  val acquiredExtra =
-memoryManager.acquireStorageMemory(
-  blockId, size - unrollMemoryUsedByThisBlock, 
MemoryMode.ON_HEAP)
-  if (acquiredExtra) {
-transferUnrollToStorage(unrollMemoryUsedByThisBlock)
-  }
-  acquiredExtra
-} else { // unrollMemoryUsedByThisBlock > size
-  // If this task attempt already owns more unroll memory than is 
necessary to store the
-  // block, then release the extra memory that will not be used.
-  val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
-  releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 
excessUnrollMemory)
-  transferUnrollToStorage(size)
-  true
-}
+}
+
+if (keepUnrolling) {
--- End diff --

updated



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163750079
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -702,6 +641,87 @@ private[spark] class MemoryStore(
   }
 }
 
+private trait ValuesBuilder[T] {
+  def preciseSize: Long
+  def build(): MemoryEntry[T]
+}
+
+private trait ValuesHolder[T] {
+  def storeValue(value: T): Unit
+  def estimatedSize(): Long
+  def getBuilder(): ValuesBuilder[T]
--- End diff --

add a comment to say that, after `getBuilder` is called, this 
`ValuesHolder` becomes invalid.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163749987
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -702,6 +641,87 @@ private[spark] class MemoryStore(
   }
 }
 
+private trait ValuesBuilder[T] {
+  def preciseSize: Long
+  def build(): MemoryEntry[T]
+}
+
+private trait ValuesHolder[T] {
+  def storeValue(value: T): Unit
+  def estimatedSize(): Long
+  def getBuilder(): ValuesBuilder[T]
+}
+
+/**
+ * A holder for storing the deserialized values.
+ */
+private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends 
ValuesHolder[T] {
+  // Underlying vector for unrolling the block
+  var vector = new SizeTrackingVector[T]()(classTag)
+  var arrayValues: Array[T] = null
+
+  override def storeValue(value: T): Unit = {
+vector += value
+  }
+
+  override def estimatedSize(): Long = {
+vector.estimateSize()
+  }
+
+  override def getBuilder(): ValuesBuilder[T] = new ValuesBuilder[T] {
+// We successfully unrolled the entirety of this block
+arrayValues = vector.toArray
+vector = null
+
+override val preciseSize: Long = SizeEstimator.estimate(arrayValues)
+
+override def build(): MemoryEntry[T] =
+  DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
+  }
+}
+
+/**
+ * A holder for storing the serialized values.
+ */
+private class SerializedValuesHolder[T](
+blockId: BlockId,
+chunkSize: Int,
+classTag: ClassTag[T],
+memoryMode: MemoryMode,
+serializerManager: SerializerManager) extends ValuesHolder[T] {
+  val allocator = memoryMode match {
+case MemoryMode.ON_HEAP => ByteBuffer.allocate _
+case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
+  }
+
+  val redirectableStream = new RedirectableOutputStream
+  val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
+  redirectableStream.setOutputStream(bbos)
+  val serializationStream: SerializationStream = {
+val autoPick = !blockId.isInstanceOf[StreamBlockId]
+val ser = serializerManager.getSerializer(classTag, 
autoPick).newInstance()
+ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
+  }
+
+  override def storeValue(value: T): Unit = {
+serializationStream.writeObject(value)(classTag)
+  }
+
+  override def estimatedSize(): Long = {
+bbos.size
+  }
+
+  override def getBuilder(): ValuesBuilder[T] = new ValuesBuilder[T] {
+// We successfully unrolled the entirety of this block
+serializationStream.close()
+
+override val preciseSize: Long = bbos.size
--- End diff --

this can be a `def`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163749065
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -232,78 +236,93 @@ private[spark] class MemoryStore(
   elementsUnrolled += 1
 }
 
+val valuesBuilder = if (keepUnrolling) {
+  Some(valuesHolder.getBuilder())
+} else {
+  None
+}
+
+// Make sure that we have enough memory to store the block. By this 
point, it is possible that
+// the block's actual memory usage has exceeded the unroll memory by a 
small amount, so we
+// perform one final call to attempt to allocate additional memory if 
necessary.
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
-  def transferUnrollToStorage(amount: Long): Unit = {
-// Synchronize so that transfer is atomic
-memoryManager.synchronized {
-  releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
-  val success = memoryManager.acquireStorageMemory(blockId, 
amount, MemoryMode.ON_HEAP)
-  assert(success, "transferring unroll memory to storage memory 
failed")
+  val size = valuesBuilder.get.preciseSize
+  if (size > unrollMemoryUsedByThisBlock) {
+val amountToRequest = size - unrollMemoryUsedByThisBlock
+keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
+if (keepUnrolling) {
+  unrollMemoryUsedByThisBlock += amountToRequest
 }
   }
-  // Acquire storage memory if necessary to store this block in memory.
-  val enoughStorageMemory = {
-if (unrollMemoryUsedByThisBlock <= size) {
-  val acquiredExtra =
-memoryManager.acquireStorageMemory(
-  blockId, size - unrollMemoryUsedByThisBlock, 
MemoryMode.ON_HEAP)
-  if (acquiredExtra) {
-transferUnrollToStorage(unrollMemoryUsedByThisBlock)
-  }
-  acquiredExtra
-} else { // unrollMemoryUsedByThisBlock > size
-  // If this task attempt already owns more unroll memory than is 
necessary to store the
-  // block, then release the extra memory that will not be used.
-  val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
-  releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 
excessUnrollMemory)
-  transferUnrollToStorage(size)
-  true
-}
+}
+
+if (keepUnrolling) {
--- End diff --

a little improvement
```
if (keepUnrolling) {
  val builder = valuesHolder.getBuilder()
  ...
  if (keepUnrolling) {
val entry = builder.build()
...
Right(entry.size)
  } else {
...
logUnrollFailureMessage(blockId, builder.preciseSize)
Left(unrollMemoryUsedByThisBlock)
  }
} else {
  ...
  logUnrollFailureMessage(blockId, valueHolder.estimatedSize)
  Left(unrollMemoryUsedByThisBlock)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-24 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163743072
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -346,85 +350,24 @@ private[spark] class MemoryStore(
 } else {
   initialMemoryThreshold.toInt
 }
-val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
-redirectableStream.setOutputStream(bbos)
-val serializationStream: SerializationStream = {
-  val autoPick = !blockId.isInstanceOf[StreamBlockId]
-  val ser = serializerManager.getSerializer(classTag, 
autoPick).newInstance()
-  ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
-}
-
-// Request enough memory to begin unrolling
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold, memoryMode)
-
-if (!keepUnrolling) {
-  logWarning(s"Failed to reserve initial memory threshold of " +
-s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
-} else {
-  unrollMemoryUsedByThisBlock += initialMemoryThreshold
-}
-
-def reserveAdditionalMemoryIfNecessary(): Unit = {
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = (bbos.size * memoryGrowthFactor - 
unrollMemoryUsedByThisBlock).toLong
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
-
-// Unroll this block safely, checking whether we have exceeded our 
threshold
-while (values.hasNext && keepUnrolling) {
-  serializationStream.writeObject(values.next())(classTag)
-  elementsUnrolled += 1
-  if (elementsUnrolled % memoryCheckPeriod == 0) {
-reserveAdditionalMemoryIfNecessary()
-  }
-}
 
-// Make sure that we have enough memory to store the block. By this 
point, it is possible that
-// the block's actual memory usage has exceeded the unroll memory by a 
small amount, so we
-// perform one final call to attempt to allocate additional memory if 
necessary.
-if (keepUnrolling) {
-  serializationStream.close()
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
+val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, 
classTag,
+  memoryMode, serializerManager)
 
-if (keepUnrolling) {
--- End diff --

Thanks for the detailed explanation. I have been updated, the code looks 
more clearly now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163556649
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -346,85 +350,24 @@ private[spark] class MemoryStore(
 } else {
   initialMemoryThreshold.toInt
 }
-val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
-redirectableStream.setOutputStream(bbos)
-val serializationStream: SerializationStream = {
-  val autoPick = !blockId.isInstanceOf[StreamBlockId]
-  val ser = serializerManager.getSerializer(classTag, 
autoPick).newInstance()
-  ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
-}
-
-// Request enough memory to begin unrolling
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold, memoryMode)
-
-if (!keepUnrolling) {
-  logWarning(s"Failed to reserve initial memory threshold of " +
-s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
-} else {
-  unrollMemoryUsedByThisBlock += initialMemoryThreshold
-}
-
-def reserveAdditionalMemoryIfNecessary(): Unit = {
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = (bbos.size * memoryGrowthFactor - 
unrollMemoryUsedByThisBlock).toLong
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
-
-// Unroll this block safely, checking whether we have exceeded our 
threshold
-while (values.hasNext && keepUnrolling) {
-  serializationStream.writeObject(values.next())(classTag)
-  elementsUnrolled += 1
-  if (elementsUnrolled % memoryCheckPeriod == 0) {
-reserveAdditionalMemoryIfNecessary()
-  }
-}
 
-// Make sure that we have enough memory to store the block. By this 
point, it is possible that
-// the block's actual memory usage has exceeded the unroll memory by a 
small amount, so we
-// perform one final call to attempt to allocate additional memory if 
necessary.
-if (keepUnrolling) {
-  serializationStream.close()
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
+val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, 
classTag,
+  memoryMode, serializerManager)
 
-if (keepUnrolling) {
--- End diff --

`putIteratorAsValues` and `putIteratorAsBytes` have different code 
structure for the last step. In the new `putIterator` method, you followed the 
code structure of `putIteratorAsValues`, is it better to follow the one from 
`putIteratorAsBytes`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-24 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163551992
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -346,85 +350,24 @@ private[spark] class MemoryStore(
 } else {
   initialMemoryThreshold.toInt
 }
-val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
-redirectableStream.setOutputStream(bbos)
-val serializationStream: SerializationStream = {
-  val autoPick = !blockId.isInstanceOf[StreamBlockId]
-  val ser = serializerManager.getSerializer(classTag, 
autoPick).newInstance()
-  ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
-}
-
-// Request enough memory to begin unrolling
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold, memoryMode)
-
-if (!keepUnrolling) {
-  logWarning(s"Failed to reserve initial memory threshold of " +
-s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
-} else {
-  unrollMemoryUsedByThisBlock += initialMemoryThreshold
-}
-
-def reserveAdditionalMemoryIfNecessary(): Unit = {
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = (bbos.size * memoryGrowthFactor - 
unrollMemoryUsedByThisBlock).toLong
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
-
-// Unroll this block safely, checking whether we have exceeded our 
threshold
-while (values.hasNext && keepUnrolling) {
-  serializationStream.writeObject(values.next())(classTag)
-  elementsUnrolled += 1
-  if (elementsUnrolled % memoryCheckPeriod == 0) {
-reserveAdditionalMemoryIfNecessary()
-  }
-}
 
-// Make sure that we have enough memory to store the block. By this 
point, it is possible that
-// the block's actual memory usage has exceeded the unroll memory by a 
small amount, so we
-// perform one final call to attempt to allocate additional memory if 
necessary.
-if (keepUnrolling) {
-  serializationStream.close()
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
+val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, 
classTag,
+  memoryMode, serializerManager)
 
-if (keepUnrolling) {
--- End diff --

I do not understand what you mean, could you explain it more?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-24 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163551817
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -702,6 +645,83 @@ private[spark] class MemoryStore(
   }
 }
 
+private trait ValuesHolder[T] {
+  def storeValue(value: T): Unit
+  def estimatedSize(roughly: Boolean): Long
--- End diff --

Very thanks, I'll update it tomorrow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163526188
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -702,6 +645,83 @@ private[spark] class MemoryStore(
   }
 }
 
+private trait ValuesHolder[T] {
+  def storeValue(value: T): Unit
+  def estimatedSize(roughly: Boolean): Long
--- End diff --

an example
```
class DeserializedValuesHolder extends ValuesHolder {
  ...
  def getBuilder = new ValuesBuilder {
val valuesArray = vector.toArray
def preciseSize = SizeEstimator.estimate(valuesArray)
def buid = ...
  }
}



class SerializedValuesHolder extends ValuesHolder {
  ...
  def getBuilder = new ValuesBuilder {
serializationStream.close()
def preciseSize = bbos.size
def build = ...
  }
}

```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163525215
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -702,6 +645,83 @@ private[spark] class MemoryStore(
   }
 }
 
+private trait ValuesHolder[T] {
+  def storeValue(value: T): Unit
+  def estimatedSize(roughly: Boolean): Long
--- End diff --

this is not a good API design, we can do
```
trait ValuesHolder {
  def putValue(value: T)
  def estimatedSize: Long
  def getBuilder(): ValuesBuilder
}
trait ValuesBuilder {
  def preciseSize: Long
  def build(): MemoryEntry
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163524751
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -346,85 +350,24 @@ private[spark] class MemoryStore(
 } else {
   initialMemoryThreshold.toInt
 }
-val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
-redirectableStream.setOutputStream(bbos)
-val serializationStream: SerializationStream = {
-  val autoPick = !blockId.isInstanceOf[StreamBlockId]
-  val ser = serializerManager.getSerializer(classTag, 
autoPick).newInstance()
-  ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
-}
-
-// Request enough memory to begin unrolling
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold, memoryMode)
-
-if (!keepUnrolling) {
-  logWarning(s"Failed to reserve initial memory threshold of " +
-s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
-} else {
-  unrollMemoryUsedByThisBlock += initialMemoryThreshold
-}
-
-def reserveAdditionalMemoryIfNecessary(): Unit = {
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = (bbos.size * memoryGrowthFactor - 
unrollMemoryUsedByThisBlock).toLong
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
-
-// Unroll this block safely, checking whether we have exceeded our 
threshold
-while (values.hasNext && keepUnrolling) {
-  serializationStream.writeObject(values.next())(classTag)
-  elementsUnrolled += 1
-  if (elementsUnrolled % memoryCheckPeriod == 0) {
-reserveAdditionalMemoryIfNecessary()
-  }
-}
 
-// Make sure that we have enough memory to store the block. By this 
point, it is possible that
-// the block's actual memory usage has exceeded the unroll memory by a 
small amount, so we
-// perform one final call to attempt to allocate additional memory if 
necessary.
-if (keepUnrolling) {
-  serializationStream.close()
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
+val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, 
classTag,
+  memoryMode, serializerManager)
 
-if (keepUnrolling) {
--- End diff --

is it better to use this code structure?
```
if (keepUnrolling) {
  // get precise size and reserve extra memory if needed
}
if (keepUnrolling) {
  // create the entry 
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-23 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163462053
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -233,17 +235,13 @@ private[spark] class MemoryStore(
 }
 
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
+  // We need more precise value
+  val size = valuesHolder.esitimatedSize(false)
--- End diff --

I change the code back to originally. For `DeserializedValuesHolder`, we 
could `buildEntry` and get the `size` from `MemorySize`. But for 
`SerializedValuesHolder`, this way not work correctly. Because we need call the 
`bbos.toChunkedByteBuffer` to get the `MemoryEntry` object, and if the reserved 
memory is not enough for transfer the unroll memory to storage memory. Then we 
unroll failed and need call `bbos.toChunkedByteBuffer` 
([L802](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala#L802),
 this should be intentional which related to #15043). So the problem is that we 
call `bbos.toChunkedByteBuffer` twice, but it can't be.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163455326
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -702,6 +645,76 @@ private[spark] class MemoryStore(
   }
 }
 
+private trait ValuesHolder[T] {
+  def storeValue(value: T): Unit
+  def estimatedSize(): Long
+  def buildEntry(): MemoryEntry[T]
+}
+
+/**
+ * A holder for storing the deserialized values.
+ */
+private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends 
ValuesHolder[T] {
+  // Underlying vector for unrolling the block
+  var vector = new SizeTrackingVector[T]()(classTag)
+  var arrayValues: Array[T] = null
+  var preciseSize: Long = -1
--- End diff --

it can be a local variable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-22 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163131741
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -162,26 +162,29 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store 
in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
* whether there is enough free memory. If the block is successfully 
materialized, then the
* temporary unroll memory used during the materialization is 
"transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the 
block.
*
-   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
-   * an iterator containing the values of the block. The returned 
iterator will be backed
-   * by the combination of the partially-unrolled block and the 
remaining elements of the
-   * original input iterator. The caller must either fully consume 
this iterator or call
-   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
-   * block.
+   * @param blockId The block id.
+   * @param values The values which need be stored.
+   * @param classTag the [[ClassTag]] for the block.
+   * @param memoryMode The values saved mode.
--- End diff --

Here the `values` is the `* @param values The values which need be 
stored.`, it isn't the type of storage, I'll re-describe later.   


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-22 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163131519
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -162,26 +162,29 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store 
in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
* whether there is enough free memory. If the block is successfully 
materialized, then the
* temporary unroll memory used during the materialization is 
"transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the 
block.
*
-   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
-   * an iterator containing the values of the block. The returned 
iterator will be backed
-   * by the combination of the partially-unrolled block and the 
remaining elements of the
-   * original input iterator. The caller must either fully consume 
this iterator or call
-   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
-   * block.
+   * @param blockId The block id.
+   * @param values The values which need be stored.
+   * @param classTag the [[ClassTag]] for the block.
+   * @param memoryMode The values saved mode.
+   * @param valuesHolder A holder that supports storing record of values 
into memory store as
+   *values or bytes.
+   * @return if the block is stored successfully, return the stored data 
size. Else return the
+   * memory has used for unroll the block.
--- End diff --

I' ll update it more clearly. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-22 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163131383
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -233,17 +235,13 @@ private[spark] class MemoryStore(
 }
 
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
+  // We need more precise value
+  val size = valuesHolder.esitimatedSize(false)
--- End diff --

> It seems we can just build the entry and call entry.size.
It 's more reasonable, I'll update it later. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162962203
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -233,17 +235,13 @@ private[spark] class MemoryStore(
 }
 
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
+  // We need more precise value
+  val size = valuesHolder.esitimatedSize(false)
--- End diff --

`roughly = false` tells more than estimating the size of vector, which is 
'unroll has finished'. So, `storeValue` will not be called anymore.

And for `heavy work`(under my understanding, mostly due to 
`SizeEstimator.estimate(arrayValues)`), yeah, I agree with you. Those `heavy 
work` would be done whether we create an entry or not or whether the entry put 
into the memory store or not finally.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162948453
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -233,17 +235,13 @@ private[spark] class MemoryStore(
 }
 
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
+  // We need more precise value
+  val size = valuesHolder.esitimatedSize(false)
--- End diff --

Looking at the implementations, I don't think this makes a lot of 
difference. `esitimatedSize(false)` already does some heavy work and makes this 
`ValuesHolder` unusable, i.e. you can't call `storeValue` anymore, which seems 
not something a size estimation should do.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162931279
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -233,17 +235,13 @@ private[spark] class MemoryStore(
 }
 
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
+  // We need more precise value
+  val size = valuesHolder.esitimatedSize(false)
--- End diff --

It can avoid creating an useless entry if the condition 
`enoughStorageMemory  == true` does not satisfy, I guess.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162927703
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -233,17 +235,13 @@ private[spark] class MemoryStore(
 }
 
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
+  // We need more precise value
+  val size = valuesHolder.esitimatedSize(false)
--- End diff --

why do we need `esitimatedSize(false)`? It seems we can just build the 
entry and call `entry.size`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-21 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162848405
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -162,26 +162,29 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store 
in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
* whether there is enough free memory. If the block is successfully 
materialized, then the
* temporary unroll memory used during the materialization is 
"transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the 
block.
*
-   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
-   * an iterator containing the values of the block. The returned 
iterator will be backed
-   * by the combination of the partially-unrolled block and the 
remaining elements of the
-   * original input iterator. The caller must either fully consume 
this iterator or call
-   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
-   * block.
+   * @param blockId The block id.
+   * @param values The values which need be stored.
+   * @param classTag the [[ClassTag]] for the block.
+   * @param memoryMode The values saved mode.
+   * @param valuesHolder A holder that supports storing record of values 
into memory store as
+   *values or bytes.
+   * @return if the block is stored successfully, return the stored data 
size. Else return the
+   * memory has used for unroll the block.
--- End diff --

First, I think you will do not disagree with that there's 
partially-unrolled case exists in failure situation.

Second,
>The block can be unrolled fully, but the used memory exceeded the request 
and can't request the extra memory.

Yeah, I know. But what I want to say is block unrolled fully doesn't mean 
we have reserved unroll memory for all values(this only happens when the last 
element in iterator % memoryCheckPeriod == 0), because of `memoryCheckPeriod`.  
And here, we talk about `the  memory has used for unroll the block`. So, it is 
not accurately to say 'block be unrolled fully, so the used memory is for all 
the values'. 

So, mostly, it would be `partially-unrolled`. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-21 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162841437
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -162,26 +162,29 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store 
in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
* whether there is enough free memory. If the block is successfully 
materialized, then the
* temporary unroll memory used during the materialization is 
"transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the 
block.
*
-   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
-   * an iterator containing the values of the block. The returned 
iterator will be backed
-   * by the combination of the partially-unrolled block and the 
remaining elements of the
-   * original input iterator. The caller must either fully consume 
this iterator or call
-   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
-   * block.
+   * @param blockId The block id.
+   * @param values The values which need be stored.
+   * @param classTag the [[ClassTag]] for the block.
+   * @param memoryMode The values saved mode.
--- End diff --

Ah, I mean put the `values`  together with `saved mode` let me consider 
about values's store type, rather than memory mode.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-21 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162840896
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -162,26 +162,29 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store 
in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
* whether there is enough free memory. If the block is successfully 
materialized, then the
* temporary unroll memory used during the materialization is 
"transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the 
block.
*
-   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
-   * an iterator containing the values of the block. The returned 
iterator will be backed
-   * by the combination of the partially-unrolled block and the 
remaining elements of the
-   * original input iterator. The caller must either fully consume 
this iterator or call
-   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
-   * block.
+   * @param blockId The block id.
+   * @param values The values which need be stored.
+   * @param classTag the [[ClassTag]] for the block.
+   * @param memoryMode The values saved mode.
+   * @param valuesHolder A holder that supports storing record of values 
into memory store as
+   *values of bytes.
+   * @return if the block is stored successfully, return the stored data 
size. Else return the
+   * memory has used for unroll the block.
--- End diff --

The block can be unrolled fully, but the used memory exceeded the request 
and can't request the extra memory.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-21 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162840776
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -162,26 +162,29 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store 
in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
* whether there is enough free memory. If the block is successfully 
materialized, then the
* temporary unroll memory used during the materialization is 
"transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the 
block.
*
-   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
-   * an iterator containing the values of the block. The returned 
iterator will be backed
-   * by the combination of the partially-unrolled block and the 
remaining elements of the
-   * original input iterator. The caller must either fully consume 
this iterator or call
-   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
-   * block.
+   * @param blockId The block id.
+   * @param values The values which need be stored.
+   * @param classTag the [[ClassTag]] for the block.
+   * @param memoryMode The values saved mode.
--- End diff --

`MemoryMode` only has ON_HEAP and OFF_HEAP two modes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-21 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162837169
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -346,85 +348,24 @@ private[spark] class MemoryStore(
 } else {
   initialMemoryThreshold.toInt
 }
-val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
-redirectableStream.setOutputStream(bbos)
-val serializationStream: SerializationStream = {
-  val autoPick = !blockId.isInstanceOf[StreamBlockId]
-  val ser = serializerManager.getSerializer(classTag, 
autoPick).newInstance()
-  ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
-}
-
-// Request enough memory to begin unrolling
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold, memoryMode)
-
-if (!keepUnrolling) {
-  logWarning(s"Failed to reserve initial memory threshold of " +
-s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
-} else {
-  unrollMemoryUsedByThisBlock += initialMemoryThreshold
-}
-
-def reserveAdditionalMemoryIfNecessary(): Unit = {
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = (bbos.size * memoryGrowthFactor - 
unrollMemoryUsedByThisBlock).toLong
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
-
-// Unroll this block safely, checking whether we have exceeded our 
threshold
-while (values.hasNext && keepUnrolling) {
-  serializationStream.writeObject(values.next())(classTag)
-  elementsUnrolled += 1
-  if (elementsUnrolled % memoryCheckPeriod == 0) {
-reserveAdditionalMemoryIfNecessary()
-  }
-}
 
-// Make sure that we have enough memory to store the block. By this 
point, it is possible that
-// the block's actual memory usage has exceeded the unroll memory by a 
small amount, so we
-// perform one final call to attempt to allocate additional memory if 
necessary.
-if (keepUnrolling) {
-  serializationStream.close()
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
+val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, 
classTag,
+  memoryMode, serializerManager)
 
-if (keepUnrolling) {
-  val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, 
memoryMode, classTag)
-  // Synchronize so that transfer is atomic
-  memoryManager.synchronized {
-releaseUnrollMemoryForThisTask(memoryMode, 
unrollMemoryUsedByThisBlock)
-val success = memoryManager.acquireStorageMemory(blockId, 
entry.size, memoryMode)
-assert(success, "transferring unroll memory to storage memory 
failed")
-  }
-  entries.synchronized {
-entries.put(blockId, entry)
-  }
-  logInfo("Block %s stored as bytes in memory (estimated size %s, free 
%s)".format(
-blockId, Utils.bytesToString(entry.size),
-Utils.bytesToString(maxMemory - blocksMemoryUsed)))
-  Right(entry.size)
-} else {
-  // We ran out of space while unrolling the values for this block
-  logUnrollFailureMessage(blockId, bbos.size)
-  Left(
-new PartiallySerializedBlock(
+putIterator(blockId, values, classTag, memoryMode, valuesHolder) match 
{
+  case Right(storedSize) => Right(storedSize)
+  case Left(unrollMemoryUsedByThisBlock) =>
+// We ran out of space while unrolling the values for this block
+logUnrollFailureMessage(blockId, unrollMemoryUsedByThisBlock)
--- End diff --

The computed size so far(according to logUnrollFailureMessage's warning) is 
`bbos.size` rather than `unrollMemoryUsedByThisBlock`, which may less than 
`bbos.size`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-21 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162835021
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -162,26 +162,29 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store 
in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
* whether there is enough free memory. If the block is successfully 
materialized, then the
* temporary unroll memory used during the materialization is 
"transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the 
block.
*
-   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
-   * an iterator containing the values of the block. The returned 
iterator will be backed
-   * by the combination of the partially-unrolled block and the 
remaining elements of the
-   * original input iterator. The caller must either fully consume 
this iterator or call
-   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
-   * block.
+   * @param blockId The block id.
+   * @param values The values which need be stored.
+   * @param classTag the [[ClassTag]] for the block.
+   * @param memoryMode The values saved mode.
+   * @param valuesHolder A holder that supports storing record of values 
into memory store as
+   *values of bytes.
+   * @return if the block is stored successfully, return the stored data 
size. Else return the
+   * memory has used for unroll the block.
--- End diff --

Maybe, *partially-unrolled*.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-21 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162834864
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -162,26 +162,29 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store 
in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
* whether there is enough free memory. If the block is successfully 
materialized, then the
* temporary unroll memory used during the materialization is 
"transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the 
block.
*
-   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
-   * an iterator containing the values of the block. The returned 
iterator will be backed
-   * by the combination of the partially-unrolled block and the 
remaining elements of the
-   * original input iterator. The caller must either fully consume 
this iterator or call
-   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
-   * block.
+   * @param blockId The block id.
+   * @param values The values which need be stored.
+   * @param classTag the [[ClassTag]] for the block.
+   * @param memoryMode The values saved mode.
--- End diff --

If say *save mode*, I will consider whether it's bytes or values. I think 
ON_HEAP or OFF_HEAP will be ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-21 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162834903
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -162,26 +162,29 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store 
in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
* whether there is enough free memory. If the block is successfully 
materialized, then the
* temporary unroll memory used during the materialization is 
"transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the 
block.
*
-   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
-   * an iterator containing the values of the block. The returned 
iterator will be backed
-   * by the combination of the partially-unrolled block and the 
remaining elements of the
-   * original input iterator. The caller must either fully consume 
this iterator or call
-   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
-   * block.
+   * @param blockId The block id.
+   * @param values The values which need be stored.
+   * @param classTag the [[ClassTag]] for the block.
+   * @param memoryMode The values saved mode.
+   * @param valuesHolder A holder that supports storing record of values 
into memory store as
+   *values of bytes.
--- End diff --

typo: of -> or


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-21 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162802949
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -261,37 +263,93 @@ private[spark] class MemoryStore(
   // If this task attempt already owns more unroll memory than is 
necessary to store the
   // block, then release the extra memory that will not be used.
   val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
-  releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 
excessUnrollMemory)
+  releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory)
   transferUnrollToStorage(size)
   true
 }
   }
+
   if (enoughStorageMemory) {
 entries.synchronized {
-  entries.put(blockId, entry)
+  entries.put(blockId, createMemoryEntry())
 }
 logInfo("Block %s stored as values in memory (estimated size %s, 
free %s)".format(
   blockId, Utils.bytesToString(size), 
Utils.bytesToString(maxMemory - blocksMemoryUsed)))
 Right(size)
   } else {
 assert(currentUnrollMemoryForThisTask >= 
unrollMemoryUsedByThisBlock,
   "released too much unroll memory")
+Left(unrollMemoryUsedByThisBlock)
+  }
+} else {
+  Left(unrollMemoryUsedByThisBlock)
+}
+  }
+
+  /**
+   * Attempt to put the given block in memory store as values.
+   *
+   * It's possible that the iterator is too large to materialize and store 
in memory. To avoid
+   * OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
+   * whether there is enough free memory. If the block is successfully 
materialized, then the
+   * temporary unroll memory used during the materialization is 
"transferred" to storage memory,
+   * so we won't acquire more memory than is actually needed to store the 
block.
+   *
+   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
+   * an iterator containing the values of the block. The returned 
iterator will be backed
+   * by the combination of the partially-unrolled block and the 
remaining elements of the
+   * original input iterator. The caller must either fully consume 
this iterator or call
+   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
+   * block.
+   */
+  private[storage] def putIteratorAsValues[T](
+  blockId: BlockId,
+  values: Iterator[T],
+  classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = 
{
+
+// Underlying vector for unrolling the block
+var vector = new SizeTrackingVector[T]()(classTag)
+var arrayValues: Array[T] = null
+var preciseSize: Long = -1
+
+def storeValue(value: T): Unit = {
+  vector += value
+}
+
+def estimateSize(precise: Boolean): Long = {
+  if (precise) {
+// We only call need the precise size after all values unrolled.
+arrayValues = vector.toArray
+preciseSize = SizeEstimator.estimate(arrayValues)
+preciseSize
+  } else {
+vector.estimateSize()
+  }
+}
+
+def createMemoryEntry(): MemoryEntry[T] = {
+  // We successfully unrolled the entirety of this block
+  DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
+}
+
+putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, storeValue,
+  estimateSize, createMemoryEntry) match {
+  case Right(storedSize) => Right(storedSize)
+  case Left(unrollMemoryUsedByThisBlock) =>
+// We ran out of space while unrolling the values for this block
+val (unrolledIterator, size) = if (vector != null) {
--- End diff --

updated, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-18 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162549759
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -261,37 +263,93 @@ private[spark] class MemoryStore(
   // If this task attempt already owns more unroll memory than is 
necessary to store the
   // block, then release the extra memory that will not be used.
   val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
-  releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 
excessUnrollMemory)
+  releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory)
   transferUnrollToStorage(size)
   true
 }
   }
+
   if (enoughStorageMemory) {
 entries.synchronized {
-  entries.put(blockId, entry)
+  entries.put(blockId, createMemoryEntry())
 }
 logInfo("Block %s stored as values in memory (estimated size %s, 
free %s)".format(
   blockId, Utils.bytesToString(size), 
Utils.bytesToString(maxMemory - blocksMemoryUsed)))
 Right(size)
   } else {
 assert(currentUnrollMemoryForThisTask >= 
unrollMemoryUsedByThisBlock,
   "released too much unroll memory")
+Left(unrollMemoryUsedByThisBlock)
+  }
+} else {
+  Left(unrollMemoryUsedByThisBlock)
+}
+  }
+
+  /**
+   * Attempt to put the given block in memory store as values.
+   *
+   * It's possible that the iterator is too large to materialize and store 
in memory. To avoid
+   * OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
+   * whether there is enough free memory. If the block is successfully 
materialized, then the
+   * temporary unroll memory used during the materialization is 
"transferred" to storage memory,
+   * so we won't acquire more memory than is actually needed to store the 
block.
+   *
+   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
+   * an iterator containing the values of the block. The returned 
iterator will be backed
+   * by the combination of the partially-unrolled block and the 
remaining elements of the
+   * original input iterator. The caller must either fully consume 
this iterator or call
+   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
+   * block.
+   */
+  private[storage] def putIteratorAsValues[T](
+  blockId: BlockId,
+  values: Iterator[T],
+  classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = 
{
+
+// Underlying vector for unrolling the block
+var vector = new SizeTrackingVector[T]()(classTag)
+var arrayValues: Array[T] = null
+var preciseSize: Long = -1
+
+def storeValue(value: T): Unit = {
+  vector += value
+}
+
+def estimateSize(precise: Boolean): Long = {
+  if (precise) {
+// We only call need the precise size after all values unrolled.
+arrayValues = vector.toArray
+preciseSize = SizeEstimator.estimate(arrayValues)
+preciseSize
+  } else {
+vector.estimateSize()
+  }
+}
+
+def createMemoryEntry(): MemoryEntry[T] = {
+  // We successfully unrolled the entirety of this block
+  DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
+}
+
+putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, storeValue,
+  estimateSize, createMemoryEntry) match {
+  case Right(storedSize) => Right(storedSize)
+  case Left(unrollMemoryUsedByThisBlock) =>
+// We ran out of space while unrolling the values for this block
+val (unrolledIterator, size) = if (vector != null) {
--- End diff --

Under what situation will vector be null ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-18 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162548350
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -162,26 +162,33 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store 
in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
* whether there is enough free memory. If the block is successfully 
materialized, then the
* temporary unroll memory used during the materialization is 
"transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the 
block.
*
-   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
-   * an iterator containing the values of the block. The returned 
iterator will be backed
-   * by the combination of the partially-unrolled block and the 
remaining elements of the
-   * original input iterator. The caller must either fully consume 
this iterator or call
-   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
-   * block.
+   * @param blockId The block id.
+   * @param values The values which need be stored.
+   * @param classTag the [[ClassTag]] for the block.
+   * @param memoryMode The values saved mode.
+   * @param storeValue Store the record of values to the MemoryStore.
+   * @param estimateSize Get the memory size which used to unroll the 
block. The parameters
+   * determine whether we need precise size.
+   * @param createMemoryEntry Using [[MemoryEntry]] to hold the stored 
values or bytes.
+   * @return if the block is stored successfully, return the stored data 
size. Else return the
+   * memory has used for unroll the block.
*/
-  private[storage] def putIteratorAsValues[T](
+  private def putIterator[T](
   blockId: BlockId,
   values: Iterator[T],
-  classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = 
{
-
+  classTag: ClassTag[T],
+  memoryMode: MemoryMode,
+  storeValue: T => Unit,
+  estimateSize: Boolean => Long,
+  createMemoryEntry: () => MemoryEntry[T]): Either[Long, Long] = {
--- End diff --

trait?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-18 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162548052
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -233,17 +235,13 @@ private[spark] class MemoryStore(
 }
 
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
+  // get the precise size
+  val size = estimateSize(true)
--- End diff --

It seems deserialized values do not have a **precise** size, even for 
`SizeEstimator.estimate(arrayValues)`. This would be confused.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162534339
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -261,37 +263,93 @@ private[spark] class MemoryStore(
   // If this task attempt already owns more unroll memory than is 
necessary to store the
   // block, then release the extra memory that will not be used.
   val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
-  releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 
excessUnrollMemory)
+  releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory)
   transferUnrollToStorage(size)
   true
 }
   }
+
   if (enoughStorageMemory) {
 entries.synchronized {
-  entries.put(blockId, entry)
+  entries.put(blockId, createMemoryEntry())
 }
 logInfo("Block %s stored as values in memory (estimated size %s, 
free %s)".format(
   blockId, Utils.bytesToString(size), 
Utils.bytesToString(maxMemory - blocksMemoryUsed)))
 Right(size)
   } else {
 assert(currentUnrollMemoryForThisTask >= 
unrollMemoryUsedByThisBlock,
   "released too much unroll memory")
+Left(unrollMemoryUsedByThisBlock)
+  }
+} else {
+  Left(unrollMemoryUsedByThisBlock)
+}
+  }
+
+  /**
+   * Attempt to put the given block in memory store as values.
+   *
+   * It's possible that the iterator is too large to materialize and store 
in memory. To avoid
+   * OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
+   * whether there is enough free memory. If the block is successfully 
materialized, then the
+   * temporary unroll memory used during the materialization is 
"transferred" to storage memory,
+   * so we won't acquire more memory than is actually needed to store the 
block.
--- End diff --

let's not duplicated this document


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162534289
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -162,26 +162,33 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store 
in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
* whether there is enough free memory. If the block is successfully 
materialized, then the
* temporary unroll memory used during the materialization is 
"transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the 
block.
*
-   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
-   * an iterator containing the values of the block. The returned 
iterator will be backed
-   * by the combination of the partially-unrolled block and the 
remaining elements of the
-   * original input iterator. The caller must either fully consume 
this iterator or call
-   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
-   * block.
+   * @param blockId The block id.
+   * @param values The values which need be stored.
+   * @param classTag the [[ClassTag]] for the block.
+   * @param memoryMode The values saved mode.
+   * @param storeValue Store the record of values to the MemoryStore.
+   * @param estimateSize Get the memory size which used to unroll the 
block. The parameters
+   * determine whether we need precise size.
+   * @param createMemoryEntry Using [[MemoryEntry]] to hold the stored 
values or bytes.
+   * @return if the block is stored successfully, return the stored data 
size. Else return the
+   * memory has used for unroll the block.
*/
-  private[storage] def putIteratorAsValues[T](
+  private def putIterator[T](
   blockId: BlockId,
   values: Iterator[T],
-  classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = 
{
-
+  classTag: ClassTag[T],
+  memoryMode: MemoryMode,
+  storeValue: T => Unit,
+  estimateSize: Boolean => Long,
+  createMemoryEntry: () => MemoryEntry[T]): Either[Long, Long] = {
--- End diff --

instead of passing 3 functions, I'd like to introduce 
```
class ValuesHolder {
  def store(value)
  def esitimatedSize()
  def build(): MemoryEntry
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-11-07 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r149379715
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -261,37 +259,97 @@ private[spark] class MemoryStore(
   // If this task attempt already owns more unroll memory than is 
necessary to store the
   // block, then release the extra memory that will not be used.
   val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
-  releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 
excessUnrollMemory)
+  releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory)
   transferUnrollToStorage(size)
   true
 }
   }
+
   if (enoughStorageMemory) {
 entries.synchronized {
-  entries.put(blockId, entry)
+  entries.put(blockId, createMemoryEntry())
 }
 logInfo("Block %s stored as values in memory (estimated size %s, 
free %s)".format(
   blockId, Utils.bytesToString(size), 
Utils.bytesToString(maxMemory - blocksMemoryUsed)))
 Right(size)
   } else {
 assert(currentUnrollMemoryForThisTask >= 
unrollMemoryUsedByThisBlock,
   "released too much unroll memory")
+Left(unrollMemoryUsedByThisBlock)
+  }
+} else {
+  Left(unrollMemoryUsedByThisBlock)
+}
+  }
+
+  /**
+   * Attempt to put the given block in memory store as values.
+   *
+   * It's possible that the iterator is too large to materialize and store 
in memory. To avoid
+   * OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
+   * whether there is enough free memory. If the block is successfully 
materialized, then the
+   * temporary unroll memory used during the materialization is 
"transferred" to storage memory,
+   * so we won't acquire more memory than is actually needed to store the 
block.
+   *
+   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
+   * an iterator containing the values of the block. The returned 
iterator will be backed
+   * by the combination of the partially-unrolled block and the 
remaining elements of the
+   * original input iterator. The caller must either fully consume 
this iterator or call
+   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
+   * block.
+   */
+  private[storage] def putIteratorAsValues[T](
+  blockId: BlockId,
+  values: Iterator[T],
+  classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = 
{
+
+// Underlying vector for unrolling the block
+var vector = new SizeTrackingVector[T]()(classTag)
+var arrayValues: Array[T] = null
+var preciseSize: Long = -1
+
+def storeValue(value: T): Unit = {
+  vector += value
+}
+
+def estimateSize(precise: Boolean): Long = {
+  if (precise) {
+// We only call need the precise size after all values unrolled.
+arrayValues = vector.toArray
+preciseSize = SizeEstimator.estimate(arrayValues)
+vector = null
+preciseSize
+  } else {
+vector.estimateSize()
+  }
+}
+
+def createMemoryEntry(): MemoryEntry[T] = {
+  // We successfully unrolled the entirety of this block
+  assert(arrayValues != null, "arrayValue shouldn't be null!")
+  assert(preciseSize != -1, "preciseSize shouldn't be -1")
--- End diff --

Under which condition would `preciseSize` be `-1`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-11-07 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r149374760
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -162,26 +162,29 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store 
in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
* whether there is enough free memory. If the block is successfully 
materialized, then the
* temporary unroll memory used during the materialization is 
"transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the 
block.
*
-   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
-   * an iterator containing the values of the block. The returned 
iterator will be backed
-   * by the combination of the partially-unrolled block and the 
remaining elements of the
-   * original input iterator. The caller must either fully consume 
this iterator or call
-   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
-   * block.
+   * @param memoryMode The values saved mode.
--- End diff --

nit: also add param description for `blockId`、 `values` and `classTag`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-11-07 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r149379088
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -261,37 +259,97 @@ private[spark] class MemoryStore(
   // If this task attempt already owns more unroll memory than is 
necessary to store the
   // block, then release the extra memory that will not be used.
   val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
-  releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 
excessUnrollMemory)
+  releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory)
   transferUnrollToStorage(size)
   true
 }
   }
+
   if (enoughStorageMemory) {
 entries.synchronized {
-  entries.put(blockId, entry)
+  entries.put(blockId, createMemoryEntry())
 }
 logInfo("Block %s stored as values in memory (estimated size %s, 
free %s)".format(
   blockId, Utils.bytesToString(size), 
Utils.bytesToString(maxMemory - blocksMemoryUsed)))
 Right(size)
   } else {
 assert(currentUnrollMemoryForThisTask >= 
unrollMemoryUsedByThisBlock,
   "released too much unroll memory")
+Left(unrollMemoryUsedByThisBlock)
+  }
+} else {
+  Left(unrollMemoryUsedByThisBlock)
+}
+  }
+
+  /**
+   * Attempt to put the given block in memory store as values.
+   *
+   * It's possible that the iterator is too large to materialize and store 
in memory. To avoid
+   * OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
+   * whether there is enough free memory. If the block is successfully 
materialized, then the
+   * temporary unroll memory used during the materialization is 
"transferred" to storage memory,
+   * so we won't acquire more memory than is actually needed to store the 
block.
+   *
+   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
+   * an iterator containing the values of the block. The returned 
iterator will be backed
+   * by the combination of the partially-unrolled block and the 
remaining elements of the
+   * original input iterator. The caller must either fully consume 
this iterator or call
+   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
+   * block.
+   */
+  private[storage] def putIteratorAsValues[T](
+  blockId: BlockId,
+  values: Iterator[T],
+  classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = 
{
+
+// Underlying vector for unrolling the block
+var vector = new SizeTrackingVector[T]()(classTag)
+var arrayValues: Array[T] = null
+var preciseSize: Long = -1
+
+def storeValue(value: T): Unit = {
+  vector += value
+}
+
+def estimateSize(precise: Boolean): Long = {
+  if (precise) {
+// We only call need the precise size after all values unrolled.
+arrayValues = vector.toArray
+preciseSize = SizeEstimator.estimate(arrayValues)
+vector = null
--- End diff --

It looks scary to put vector to null in the function `estimateSize`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-11-07 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r149379817
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -261,37 +259,97 @@ private[spark] class MemoryStore(
   // If this task attempt already owns more unroll memory than is 
necessary to store the
   // block, then release the extra memory that will not be used.
   val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
-  releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 
excessUnrollMemory)
+  releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory)
   transferUnrollToStorage(size)
   true
 }
   }
+
   if (enoughStorageMemory) {
 entries.synchronized {
-  entries.put(blockId, entry)
+  entries.put(blockId, createMemoryEntry())
 }
 logInfo("Block %s stored as values in memory (estimated size %s, 
free %s)".format(
   blockId, Utils.bytesToString(size), 
Utils.bytesToString(maxMemory - blocksMemoryUsed)))
 Right(size)
   } else {
 assert(currentUnrollMemoryForThisTask >= 
unrollMemoryUsedByThisBlock,
   "released too much unroll memory")
+Left(unrollMemoryUsedByThisBlock)
+  }
+} else {
+  Left(unrollMemoryUsedByThisBlock)
+}
+  }
+
+  /**
+   * Attempt to put the given block in memory store as values.
+   *
+   * It's possible that the iterator is too large to materialize and store 
in memory. To avoid
+   * OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
+   * whether there is enough free memory. If the block is successfully 
materialized, then the
+   * temporary unroll memory used during the materialization is 
"transferred" to storage memory,
+   * so we won't acquire more memory than is actually needed to store the 
block.
+   *
+   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
+   * an iterator containing the values of the block. The returned 
iterator will be backed
+   * by the combination of the partially-unrolled block and the 
remaining elements of the
+   * original input iterator. The caller must either fully consume 
this iterator or call
+   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
+   * block.
+   */
+  private[storage] def putIteratorAsValues[T](
+  blockId: BlockId,
+  values: Iterator[T],
+  classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = 
{
+
+// Underlying vector for unrolling the block
+var vector = new SizeTrackingVector[T]()(classTag)
+var arrayValues: Array[T] = null
+var preciseSize: Long = -1
+
+def storeValue(value: T): Unit = {
+  vector += value
+}
+
+def estimateSize(precise: Boolean): Long = {
+  if (precise) {
+// We only call need the precise size after all values unrolled.
+arrayValues = vector.toArray
+preciseSize = SizeEstimator.estimate(arrayValues)
+vector = null
+preciseSize
+  } else {
+vector.estimateSize()
+  }
+}
+
+def createMemoryEntry(): MemoryEntry[T] = {
+  // We successfully unrolled the entirety of this block
+  assert(arrayValues != null, "arrayValue shouldn't be null!")
+  assert(preciseSize != -1, "preciseSize shouldn't be -1")
+  val entry = new DeserializedMemoryEntry[T](arrayValues, preciseSize, 
classTag)
--- End diff --

Why do we need to create the val `entry`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-09-22 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r140490824
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -354,63 +401,30 @@ private[spark] class MemoryStore(
   ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
 }
 
-// Request enough memory to begin unrolling
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold, memoryMode)
-
-if (!keepUnrolling) {
-  logWarning(s"Failed to reserve initial memory threshold of " +
-s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
-} else {
-  unrollMemoryUsedByThisBlock += initialMemoryThreshold
+def storeValue(value: T): Unit = {
+  serializationStream.writeObject(value)(classTag)
 }
 
-def reserveAdditionalMemoryIfNecessary(): Unit = {
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = (bbos.size * memoryGrowthFactor - 
unrollMemoryUsedByThisBlock).toLong
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
-
-// Unroll this block safely, checking whether we have exceeded our 
threshold
-while (values.hasNext && keepUnrolling) {
-  serializationStream.writeObject(values.next())(classTag)
-  elementsUnrolled += 1
-  if (elementsUnrolled % memoryCheckPeriod == 0) {
-reserveAdditionalMemoryIfNecessary()
+def estimateSize(precise: Boolean): Long = {
+  if (precise) {
+serializationStream.flush()
--- End diff --

@cloud-fan Sorry for the previous saying, I read the code again. Here seems 
call `serializationStream .close` is also OK. Because the the iterator is has 
not value need write, that's meaning the `serializationStream` don't need 
anymore.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-09-20 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r140149449
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -354,63 +401,30 @@ private[spark] class MemoryStore(
   ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
 }
 
-// Request enough memory to begin unrolling
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold, memoryMode)
-
-if (!keepUnrolling) {
-  logWarning(s"Failed to reserve initial memory threshold of " +
-s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
-} else {
-  unrollMemoryUsedByThisBlock += initialMemoryThreshold
+def storeValue(value: T): Unit = {
+  serializationStream.writeObject(value)(classTag)
 }
 
-def reserveAdditionalMemoryIfNecessary(): Unit = {
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = (bbos.size * memoryGrowthFactor - 
unrollMemoryUsedByThisBlock).toLong
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
-
-// Unroll this block safely, checking whether we have exceeded our 
threshold
-while (values.hasNext && keepUnrolling) {
-  serializationStream.writeObject(values.next())(classTag)
-  elementsUnrolled += 1
-  if (elementsUnrolled % memoryCheckPeriod == 0) {
-reserveAdditionalMemoryIfNecessary()
+def estimateSize(precise: Boolean): Long = {
+  if (precise) {
+serializationStream.flush()
--- End diff --

OK, I'll do it tomorrow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r140141918
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -354,63 +401,30 @@ private[spark] class MemoryStore(
   ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
 }
 
-// Request enough memory to begin unrolling
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold, memoryMode)
-
-if (!keepUnrolling) {
-  logWarning(s"Failed to reserve initial memory threshold of " +
-s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
-} else {
-  unrollMemoryUsedByThisBlock += initialMemoryThreshold
+def storeValue(value: T): Unit = {
+  serializationStream.writeObject(value)(classTag)
 }
 
-def reserveAdditionalMemoryIfNecessary(): Unit = {
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = (bbos.size * memoryGrowthFactor - 
unrollMemoryUsedByThisBlock).toLong
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
-
-// Unroll this block safely, checking whether we have exceeded our 
threshold
-while (values.hasNext && keepUnrolling) {
-  serializationStream.writeObject(values.next())(classTag)
-  elementsUnrolled += 1
-  if (elementsUnrolled % memoryCheckPeriod == 0) {
-reserveAdditionalMemoryIfNecessary()
+def estimateSize(precise: Boolean): Long = {
+  if (precise) {
+serializationStream.flush()
--- End diff --

can you send a PR to fix this issue for `putIteratorAsBytes` first? It will 
make this PR easier to review


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-09-20 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r140126755
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -354,63 +401,30 @@ private[spark] class MemoryStore(
   ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
 }
 
-// Request enough memory to begin unrolling
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold, memoryMode)
-
-if (!keepUnrolling) {
-  logWarning(s"Failed to reserve initial memory threshold of " +
-s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
-} else {
-  unrollMemoryUsedByThisBlock += initialMemoryThreshold
+def storeValue(value: T): Unit = {
+  serializationStream.writeObject(value)(classTag)
 }
 
-def reserveAdditionalMemoryIfNecessary(): Unit = {
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = (bbos.size * memoryGrowthFactor - 
unrollMemoryUsedByThisBlock).toLong
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
-
-// Unroll this block safely, checking whether we have exceeded our 
threshold
-while (values.hasNext && keepUnrolling) {
-  serializationStream.writeObject(values.next())(classTag)
-  elementsUnrolled += 1
-  if (elementsUnrolled % memoryCheckPeriod == 0) {
-reserveAdditionalMemoryIfNecessary()
+def estimateSize(precise: Boolean): Long = {
+  if (precise) {
+serializationStream.flush()
--- End diff --

Because there are some data cached in the serializationStream, we can't get 
the precise size if don't call `flush`.  Previous we don't check again after 
unrolled the block, and it directly call the `serializationStream.close()`. But 
here we maybe need the `serializationStream` again if we can't get anther 
unroll memory, so we only should call `flush`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r139985550
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -354,63 +401,30 @@ private[spark] class MemoryStore(
   ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
 }
 
-// Request enough memory to begin unrolling
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold, memoryMode)
-
-if (!keepUnrolling) {
-  logWarning(s"Failed to reserve initial memory threshold of " +
-s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
-} else {
-  unrollMemoryUsedByThisBlock += initialMemoryThreshold
+def storeValue(value: T): Unit = {
+  serializationStream.writeObject(value)(classTag)
 }
 
-def reserveAdditionalMemoryIfNecessary(): Unit = {
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = (bbos.size * memoryGrowthFactor - 
unrollMemoryUsedByThisBlock).toLong
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
-
-// Unroll this block safely, checking whether we have exceeded our 
threshold
-while (values.hasNext && keepUnrolling) {
-  serializationStream.writeObject(values.next())(classTag)
-  elementsUnrolled += 1
-  if (elementsUnrolled % memoryCheckPeriod == 0) {
-reserveAdditionalMemoryIfNecessary()
+def estimateSize(precise: Boolean): Long = {
+  if (precise) {
+serializationStream.flush()
--- End diff --

I don't see anywhere in the previous code call `flush`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-09-20 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r139938279
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -233,17 +235,13 @@ private[spark] class MemoryStore(
 }
 
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
+  // get the precise size
+  val size = estimateSize(true)
--- End diff --

Previously, the `putIteratorAsValues ` seems no problem. But the 
`putIteratorAsBytes ` doesn't check again after unrolled the iterator.  Now the 
`putIterator` is copied form previous `putIteratorAsValues `. For 
`SizeTrackingVector`, we could call `arrayValues.toIterator` to get a iterator 
again after call `SizeTrackingVector.toArray`. But for 
`ChunkedByteBufferOutputStream`, we can't back to `stream` after called 
`ChunkedByteBufferOutputStream.toChunkedByteBuffer` (the 
`PartiallySerializedBlock` need a stream).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r139929616
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -233,17 +235,13 @@ private[spark] class MemoryStore(
 }
 
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
+  // get the precise size
+  val size = estimateSize(true)
--- End diff --

But the previous code just calls `entry.size`, are you fixing a new bug?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-09-20 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r139920393
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -233,17 +235,13 @@ private[spark] class MemoryStore(
 }
 
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
+  // get the precise size
+  val size = estimateSize(true)
--- End diff --

We just unrolled the iterator successfully until here. But the size of 
underlying vector maybe greater than the `unrollMemoryUsedByThisBlock` which we 
requested memory for unroll the block. So we need check it again and determine 
whether we need request more memory. And we only should call 
`bbos.toChunkedByteBuffer`  or `vector.toArray` after we requested enough 
memory.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-09-20 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r139917367
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -233,17 +235,13 @@ private[spark] class MemoryStore(
 }
 
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
+  // get the precise size
+  val size = estimateSize(true)
--- End diff --

This is just we unrolled the iterator successfully. But maybe the size of 
underlying vector is greater than `unrollMemoryUsedByThisBlock `, so we need 
request more memory. In this time, there is a possible that we can't request 
enough memory again, so we should call `bbos.toChunkedByteBuffer` or 
`vector.toArray` after requested enough memory.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-09-20 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r139917831
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -252,7 +250,7 @@ private[spark] class MemoryStore(
 if (unrollMemoryUsedByThisBlock <= size) {
--- End diff --

Here the size of underlying vector or bytebuffer maybe greater than the 
`unrollMemoryUsedByThisBlock `. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r139897940
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -233,17 +235,13 @@ private[spark] class MemoryStore(
 }
 
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
+  // get the precise size
+  val size = estimateSize(true)
--- End diff --

Why we need `estimateSize(true)`? Is this just creating the entry and 
getting `entry.size`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-09-19 Thread ConeyLiu
GitHub user ConeyLiu opened a pull request:

https://github.com/apache/spark/pull/19285

[SPARK-22068][CORE]Reduce the duplicate code between putIteratorAsValues 
and putIteratorAsBytes

## What changes were proposed in this pull request?

The code logic between `MemoryStore.putIteratorAsValues` and 
`Memory.putIteratorAsBytes` are almost same, so we should reduce the duplicate 
code between them.

## How was this patch tested?

Existing UT.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ConeyLiu/spark rmemorystore

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19285.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19285


commit 2c20dcbcf499aee5d6fbbb80f4803b3cad37c17c
Author: Xianyang Liu 
Date:   2017-09-17T09:53:49Z

refactor memorystore

commit 120564303641a92d32ec434dba5076771f6d6e80
Author: Xianyang Liu 
Date:   2017-09-19T08:47:24Z

fix conflicts

commit 92e1d51b18a810307a0b6d0cb761925a0429ead2
Author: Xianyang Liu 
Date:   2017-09-19T23:45:17Z

fix bug and add some comments

commit 6e2e29be7ad9d4bf3aae2d55fb4bf93c3286009b
Author: Xianyang Liu 
Date:   2017-09-20T00:28:35Z

better variable name




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org