[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...

2018-09-11 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/22371
  
OK, thanks everyone for the help. Close it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...

2018-09-11 Thread ConeyLiu
Github user ConeyLiu closed the pull request at:

https://github.com/apache/spark/pull/22371


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...

2018-09-10 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/22371
  
@squito , thanks for the review. I intend to using `ConcurrentHashMap[Int, 
AtomicReferenceArray]` previously.

After re-think the code, I can know the lock here is used to prevent the 
same task with different attempt to commit the shuffle writer result at the 
same time. The task has a different attempt can be caused by follows:

1. Failed task or stage. In this case, the previous task attempt should 
already finish(failed or killed) or the result is not used anymore.

2. `Speculative task`. In this case, the speculative task can't be 
scheduled to the same executor as other attempts.

So, what's real value for the lock. Maybe I'm wrong, hopeful some answers.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...

2018-09-10 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22371#discussion_r216304910
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -51,6 +52,8 @@ private[spark] class IndexShuffleBlockResolver(
 
   private val transportConf = SparkTransportConf.fromSparkConf(conf, 
"shuffle")
 
+  private val shuffleIdToLocks = new ConcurrentHashMap[Int, 
Array[Object]]()
--- End diff --

Seems `Object` can't put into `Array[_]` directly, maybe need some casts. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...

2018-09-10 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/22371
  
Thanks @felixcheung, @srowen, @cloud-fan for your time. There is only one 
instance of `IndexShuffleBlockResolver` per executor, and the synchronize is 
used to protect the modify safely when there are same tasks with different 
attempt update at the same time. The synchronize is unnecessary for most of the 
tasks, and the modify is very simple.

I have tested locally, the results as follow. I admit that this change 
brings little improvement to complex tasks, but it does not cause performance 
degradation.

`./spark-shell --master local[20] --driver-memory 40g`
`spark.range(0, 1000, 1, 100).repartition(200).count()`

before: 

map | reduce
 | ---
2s | 0.4s
0.8s |  0.2s
0.7s |  0.2s

after:

map | reduce
 | ---
0.8s | 0.2s
0.5s |  0.4s
0.5s |  0.2s


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...

2018-09-09 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/22371
  
@cloud-fan @jiangxb1987 Could you help to review this? Thanks a lot.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...

2018-09-09 Thread ConeyLiu
GitHub user ConeyLiu opened a pull request:

https://github.com/apache/spark/pull/22371

[SPARK-25386][CORE] Don't need to synchronize the IndexShuffleBlockResolver 
for each writeIndexFileAndCommit


## What changes were proposed in this pull request?

Now, we need synchronize the instance of IndexShuffleBlockResolver in order 
to make the commit check and tmp file rename atomically. This can be improved. 
We could synchronize a lock which is different for each `shuffleId + mapId` 
instead of  synchronize the indexShuffleBlockResolver for each 
writeIndexFileAndCommit.

This should be an optimization with space for time, but it doesn't take up 
a lot of space.

## How was this patch tested?

Existing UT.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ConeyLiu/spark indexShuffleBlockResolver

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22371.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22371


commit 92c2e07171f60b977c62661ea6475486a1599b19
Author: Xianyang Liu 
Date:   2018-09-09T10:44:23Z

don't need synchronized the IndexShuffleBlockResolver for each 
writeIndexFileAndCommit




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20844: [SPARK-23707][SQL] Don't need shuffle exchange wi...

2018-08-26 Thread ConeyLiu
Github user ConeyLiu closed the pull request at:

https://github.com/apache/spark/pull/20844


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20844: [SPARK-23707][SQL] Don't need shuffle exchange with sing...

2018-08-26 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/20844
  
thanks for all. Closes it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20844: [SPARK-23707][SQL] Don't need shuffle exchange wi...

2018-03-22 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20844#discussion_r176327636
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -348,6 +348,13 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
   override lazy val metrics = Map(
 "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"))
 
+  /** Specifies how data is partitioned across different nodes in the 
cluster. */
+  override def outputPartitioning: Partitioning = if (numSlices == 1 && 
numElements != 0) {
--- End diff --

This related to the [UT 
error](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88474/testReport/org.apache.spark.sql/DataFrameRangeSuite/SPARK_7150_range_api/).
 `spark.range(-10, -9, -20, 1).count()` faild when `codegen` set to true and 
`RangeExec.outputPartitioning' set to `SinglePartition`. I try to found the 
root reason, but failed.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20844: [SPARK-23707][SQL] Don't need shuffle exchange with sing...

2018-03-21 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/20844
  
This change is very simple, and just make it consistent with other 
`LeafNode`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20844: [SPARK-23707][SQL] No shuffle exchange with single parti...

2018-03-21 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/20844
  
@cloud-fan, pls take a look, thanks a lot.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20844: [SPARK-23707][SQL] Fresh 'initRange' name to avoi...

2018-03-20 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20844#discussion_r175966108
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -396,9 +396,11 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
 // The default size of a batch, which must be positive integer
 val batchSize = 1000
 
-val initRangeFuncName = ctx.addNewFunction("initRange",
+val initRange = ctx.freshName("initRange")
+
+val initRangeFuncName = ctx.addNewFunction(initRange,
   s"""
-| private void initRange(int idx) {
+| private void ${initRange}(int idx) {
--- End diff --

Thanks for your suggestion, let me take a try.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20844: [SPARK-23707][SQL] Fresh 'initRange' name to avoi...

2018-03-19 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20844#discussion_r175658889
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -396,9 +396,11 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
 // The default size of a batch, which must be positive integer
 val batchSize = 1000
 
-val initRangeFuncName = ctx.addNewFunction("initRange",
+val initRange = ctx.freshName("initRange")
+
+val initRangeFuncName = ctx.addNewFunction(initRange,
   s"""
-| private void initRange(int idx) {
+| private void ${initRange}(int idx) {
--- End diff --

Hi @cloud-fan , before adding the comments, I have a question about why we 
still need `exchange ` if we join two `spark.range(1, 10, 1, 1)`. Because of 
both of the `range` are only one partition, does the `exchange` really needed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20844: [SPARK-23707][SQL] Fresh 'initRange' name to avoi...

2018-03-19 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20844#discussion_r175634287
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -396,9 +396,11 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
 // The default size of a batch, which must be positive integer
 val batchSize = 1000
 
-val initRangeFuncName = ctx.addNewFunction("initRange",
+val initRange = ctx.freshName("initRange")
+
+val initRangeFuncName = ctx.addNewFunction(initRange,
   s"""
-| private void initRange(int idx) {
+| private void ${initRange}(int idx) {
--- End diff --

OK, I can just some comments and keep the code unchanged. I changed it here 
just for better code robustness.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20844: [SPARK-23707][SQL] Fresh 'initRange' name to avoi...

2018-03-18 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20844#discussion_r175315224
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -396,9 +396,11 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
 // The default size of a batch, which must be positive integer
 val batchSize = 1000
 
-val initRangeFuncName = ctx.addNewFunction("initRange",
+val initRange = ctx.freshName("initRange")
+
+val initRangeFuncName = ctx.addNewFunction(initRange,
   s"""
-| private void initRange(int idx) {
+| private void ${initRange}(int idx) {
--- End diff --

@cloud-fan thanks for reviewing. Both `BroadCastExchange` and 
`ShuffleExchange` don't support `CodegenSupport`, so there should be two 
`WholeStageCodegen`. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20844: [SPARK-23707][SQL] Fresh 'initRange' name to avoid metho...

2018-03-16 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/20844
  
@cloud-fan pls take a look, this is a small change. Thanks a lot.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20844: [SPARK-23707][SQL] Fresh 'initRange' name to avoi...

2018-03-16 Thread ConeyLiu
GitHub user ConeyLiu opened a pull request:

https://github.com/apache/spark/pull/20844

[SPARK-23707][SQL] Fresh 'initRange' name to avoid method name conflicts

## What changes were proposed in this pull request?
We should call `ctx.freshName` to get the `initRange` to avoid name 
conflicts.

## How was this patch tested?

Existing UT.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ConeyLiu/spark range

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20844.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20844


commit d69f32ca49e3b2fa730d0520f48403eeebce60e4
Author: Xianyang Liu <xianyang.liu@...>
Date:   2018-03-16T07:56:52Z

Fresh 'initRange' name to avoid method name conflicts




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20676: [SPARK-23516][CORE] It is unnecessary to transfer unroll...

2018-02-27 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/20676
  
Yeah, I see that. I'm not sure it's OK to change. But I think we should 
follow the interface design, not the underlying implementation. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20676: [SPARK-23516][CORE] It is unnecessary to transfer...

2018-02-27 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20676#discussion_r171115071
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -246,18 +246,18 @@ private[spark] class MemoryStore(
 val amountToRequest = size - unrollMemoryUsedByThisBlock
 keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
 if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
+  unrollMemoryUsedByThisBlock = size
 }
+  } else if (size < unrollMemoryUsedByThisBlock) {
--- End diff --

In #19285, we first release `unrollMemoryUsedByThisBlock` unroll memory, 
and then we request `entry.size` storage memory. So, there is no waste of 
resources here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20676: [SPARK-23516][CORE] It is unnecessary to transfer unroll...

2018-02-27 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/20676
  
This is for compatibility reasons. The memory management also support 
legacy memory management (`StaticMemoryManager`). In `StaticMemoryManager`, the 
storage memory and unroll memory is managed separately. So we can't say 

>In fact, unroll memory is also storage memory


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20461: [SPARK-23289][CORE]OneForOneBlockFetcher.Download...

2018-01-31 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20461#discussion_r165246022
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 ---
@@ -171,7 +171,9 @@ private void failRemainingBlocks(String[] 
failedBlockIds, Throwable e) {
 
 @Override
 public void onData(String streamId, ByteBuffer buf) throws IOException 
{
-  channel.write(buf);
+  while (buf.hasRemaining()) {
+channel.write(buf);
--- End diff --


[FileSuite.writeBinaryData](https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/FileSuite.scala#L247)
 this also should be fixed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...

2018-01-26 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19285
  
thanks all.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-24 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163768689
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -232,78 +236,93 @@ private[spark] class MemoryStore(
   elementsUnrolled += 1
 }
 
+val valuesBuilder = if (keepUnrolling) {
+  Some(valuesHolder.getBuilder())
+} else {
+  None
+}
+
+// Make sure that we have enough memory to store the block. By this 
point, it is possible that
+// the block's actual memory usage has exceeded the unroll memory by a 
small amount, so we
+// perform one final call to attempt to allocate additional memory if 
necessary.
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
-  def transferUnrollToStorage(amount: Long): Unit = {
-// Synchronize so that transfer is atomic
-memoryManager.synchronized {
-  releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
-  val success = memoryManager.acquireStorageMemory(blockId, 
amount, MemoryMode.ON_HEAP)
-  assert(success, "transferring unroll memory to storage memory 
failed")
+  val size = valuesBuilder.get.preciseSize
+  if (size > unrollMemoryUsedByThisBlock) {
+val amountToRequest = size - unrollMemoryUsedByThisBlock
+keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
+if (keepUnrolling) {
+  unrollMemoryUsedByThisBlock += amountToRequest
 }
   }
-  // Acquire storage memory if necessary to store this block in memory.
-  val enoughStorageMemory = {
-if (unrollMemoryUsedByThisBlock <= size) {
-  val acquiredExtra =
-memoryManager.acquireStorageMemory(
-  blockId, size - unrollMemoryUsedByThisBlock, 
MemoryMode.ON_HEAP)
-  if (acquiredExtra) {
-transferUnrollToStorage(unrollMemoryUsedByThisBlock)
-  }
-  acquiredExtra
-} else { // unrollMemoryUsedByThisBlock > size
-  // If this task attempt already owns more unroll memory than is 
necessary to store the
-  // block, then release the extra memory that will not be used.
-  val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
-  releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 
excessUnrollMemory)
-  transferUnrollToStorage(size)
-  true
-}
+}
+
+if (keepUnrolling) {
--- End diff --

updated



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-24 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163743072
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -346,85 +350,24 @@ private[spark] class MemoryStore(
 } else {
   initialMemoryThreshold.toInt
 }
-val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
-redirectableStream.setOutputStream(bbos)
-val serializationStream: SerializationStream = {
-  val autoPick = !blockId.isInstanceOf[StreamBlockId]
-  val ser = serializerManager.getSerializer(classTag, 
autoPick).newInstance()
-  ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
-}
-
-// Request enough memory to begin unrolling
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold, memoryMode)
-
-if (!keepUnrolling) {
-  logWarning(s"Failed to reserve initial memory threshold of " +
-s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
-} else {
-  unrollMemoryUsedByThisBlock += initialMemoryThreshold
-}
-
-def reserveAdditionalMemoryIfNecessary(): Unit = {
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = (bbos.size * memoryGrowthFactor - 
unrollMemoryUsedByThisBlock).toLong
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
-
-// Unroll this block safely, checking whether we have exceeded our 
threshold
-while (values.hasNext && keepUnrolling) {
-  serializationStream.writeObject(values.next())(classTag)
-  elementsUnrolled += 1
-  if (elementsUnrolled % memoryCheckPeriod == 0) {
-reserveAdditionalMemoryIfNecessary()
-  }
-}
 
-// Make sure that we have enough memory to store the block. By this 
point, it is possible that
-// the block's actual memory usage has exceeded the unroll memory by a 
small amount, so we
-// perform one final call to attempt to allocate additional memory if 
necessary.
-if (keepUnrolling) {
-  serializationStream.close()
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
+val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, 
classTag,
+  memoryMode, serializerManager)
 
-if (keepUnrolling) {
--- End diff --

Thanks for the detailed explanation. I have been updated, the code looks 
more clearly now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-24 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163551992
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -346,85 +350,24 @@ private[spark] class MemoryStore(
 } else {
   initialMemoryThreshold.toInt
 }
-val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
-redirectableStream.setOutputStream(bbos)
-val serializationStream: SerializationStream = {
-  val autoPick = !blockId.isInstanceOf[StreamBlockId]
-  val ser = serializerManager.getSerializer(classTag, 
autoPick).newInstance()
-  ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
-}
-
-// Request enough memory to begin unrolling
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold, memoryMode)
-
-if (!keepUnrolling) {
-  logWarning(s"Failed to reserve initial memory threshold of " +
-s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
-} else {
-  unrollMemoryUsedByThisBlock += initialMemoryThreshold
-}
-
-def reserveAdditionalMemoryIfNecessary(): Unit = {
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = (bbos.size * memoryGrowthFactor - 
unrollMemoryUsedByThisBlock).toLong
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
-
-// Unroll this block safely, checking whether we have exceeded our 
threshold
-while (values.hasNext && keepUnrolling) {
-  serializationStream.writeObject(values.next())(classTag)
-  elementsUnrolled += 1
-  if (elementsUnrolled % memoryCheckPeriod == 0) {
-reserveAdditionalMemoryIfNecessary()
-  }
-}
 
-// Make sure that we have enough memory to store the block. By this 
point, it is possible that
-// the block's actual memory usage has exceeded the unroll memory by a 
small amount, so we
-// perform one final call to attempt to allocate additional memory if 
necessary.
-if (keepUnrolling) {
-  serializationStream.close()
-  if (bbos.size > unrollMemoryUsedByThisBlock) {
-val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
-keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
-if (keepUnrolling) {
-  unrollMemoryUsedByThisBlock += amountToRequest
-}
-  }
-}
+val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, 
classTag,
+  memoryMode, serializerManager)
 
-if (keepUnrolling) {
--- End diff --

I do not understand what you mean, could you explain it more?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-24 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163551817
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -702,6 +645,83 @@ private[spark] class MemoryStore(
   }
 }
 
+private trait ValuesHolder[T] {
+  def storeValue(value: T): Unit
+  def estimatedSize(roughly: Boolean): Long
--- End diff --

Very thanks, I'll update it tomorrow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-23 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163462053
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -233,17 +235,13 @@ private[spark] class MemoryStore(
 }
 
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
+  // We need more precise value
+  val size = valuesHolder.esitimatedSize(false)
--- End diff --

I change the code back to originally. For `DeserializedValuesHolder`, we 
could `buildEntry` and get the `size` from `MemorySize`. But for 
`SerializedValuesHolder`, this way not work correctly. Because we need call the 
`bbos.toChunkedByteBuffer` to get the `MemoryEntry` object, and if the reserved 
memory is not enough for transfer the unroll memory to storage memory. Then we 
unroll failed and need call `bbos.toChunkedByteBuffer` 
([L802](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala#L802),
 this should be intentional which related to #15043). So the problem is that we 
call `bbos.toChunkedByteBuffer` twice, but it can't be.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...

2018-01-23 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19285
  
Thanks for your valuable suggestion, the code has been updated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-22 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163131741
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -162,26 +162,29 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store 
in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
* whether there is enough free memory. If the block is successfully 
materialized, then the
* temporary unroll memory used during the materialization is 
"transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the 
block.
*
-   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
-   * an iterator containing the values of the block. The returned 
iterator will be backed
-   * by the combination of the partially-unrolled block and the 
remaining elements of the
-   * original input iterator. The caller must either fully consume 
this iterator or call
-   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
-   * block.
+   * @param blockId The block id.
+   * @param values The values which need be stored.
+   * @param classTag the [[ClassTag]] for the block.
+   * @param memoryMode The values saved mode.
--- End diff --

Here the `values` is the `* @param values The values which need be 
stored.`, it isn't the type of storage, I'll re-describe later.   


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-22 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163131519
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -162,26 +162,29 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store 
in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
* whether there is enough free memory. If the block is successfully 
materialized, then the
* temporary unroll memory used during the materialization is 
"transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the 
block.
*
-   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
-   * an iterator containing the values of the block. The returned 
iterator will be backed
-   * by the combination of the partially-unrolled block and the 
remaining elements of the
-   * original input iterator. The caller must either fully consume 
this iterator or call
-   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
-   * block.
+   * @param blockId The block id.
+   * @param values The values which need be stored.
+   * @param classTag the [[ClassTag]] for the block.
+   * @param memoryMode The values saved mode.
+   * @param valuesHolder A holder that supports storing record of values 
into memory store as
+   *values or bytes.
+   * @return if the block is stored successfully, return the stored data 
size. Else return the
+   * memory has used for unroll the block.
--- End diff --

I' ll update it more clearly. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-22 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r163131383
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -233,17 +235,13 @@ private[spark] class MemoryStore(
 }
 
 if (keepUnrolling) {
-  // We successfully unrolled the entirety of this block
-  val arrayValues = vector.toArray
-  vector = null
-  val entry =
-new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
-  val size = entry.size
+  // We need more precise value
+  val size = valuesHolder.esitimatedSize(false)
--- End diff --

> It seems we can just build the entry and call entry.size.
It 's more reasonable, I'll update it later. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-21 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162840896
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -162,26 +162,29 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store 
in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
* whether there is enough free memory. If the block is successfully 
materialized, then the
* temporary unroll memory used during the materialization is 
"transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the 
block.
*
-   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
-   * an iterator containing the values of the block. The returned 
iterator will be backed
-   * by the combination of the partially-unrolled block and the 
remaining elements of the
-   * original input iterator. The caller must either fully consume 
this iterator or call
-   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
-   * block.
+   * @param blockId The block id.
+   * @param values The values which need be stored.
+   * @param classTag the [[ClassTag]] for the block.
+   * @param memoryMode The values saved mode.
+   * @param valuesHolder A holder that supports storing record of values 
into memory store as
+   *values of bytes.
+   * @return if the block is stored successfully, return the stored data 
size. Else return the
+   * memory has used for unroll the block.
--- End diff --

The block can be unrolled fully, but the used memory exceeded the request 
and can't request the extra memory.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-21 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162840776
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -162,26 +162,29 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store 
in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
* whether there is enough free memory. If the block is successfully 
materialized, then the
* temporary unroll memory used during the materialization is 
"transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the 
block.
*
-   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
-   * an iterator containing the values of the block. The returned 
iterator will be backed
-   * by the combination of the partially-unrolled block and the 
remaining elements of the
-   * original input iterator. The caller must either fully consume 
this iterator or call
-   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
-   * block.
+   * @param blockId The block id.
+   * @param values The values which need be stored.
+   * @param classTag the [[ClassTag]] for the block.
+   * @param memoryMode The values saved mode.
--- End diff --

`MemoryMode` only has ON_HEAP and OFF_HEAP two modes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...

2018-01-21 Thread ConeyLiu
Github user ConeyLiu closed the pull request at:

https://github.com/apache/spark/pull/20026


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20026: [SPARK-22838][Core] Avoid unnecessary copying of data

2018-01-21 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/20026
  
close it, thanks for everyone.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...

2018-01-21 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20026#discussion_r162810684
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -152,7 +153,7 @@ private class DiskBlockData(
 file: File,
 blockSize: Long) extends BlockData {
 
-  override def toInputStream(): InputStream = new FileInputStream(file)
+  override def toInputStream(): InputStream = new 
NioBufferedFileInputStream(file)
--- End diff --

>IIUC for network (netty) transmission, it uses zero copy sendFile, which 
is another path (toNetty). 

Thanks for explaining, I did not notice this before.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...

2018-01-21 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20026#discussion_r162803175
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -152,7 +153,7 @@ private class DiskBlockData(
 file: File,
 blockSize: Long) extends BlockData {
 
-  override def toInputStream(): InputStream = new FileInputStream(file)
+  override def toInputStream(): InputStream = new 
NioBufferedFileInputStream(file)
--- End diff --

Hi @jerryshao, thanks for reviewing. This is inspired by #15408. 
> the returned `InputStream` will be deserialized in `BlockManger`

This is not entirely correct. Sometimes we don't need deserialized, such as 
network transmission. And also, this does not add extra work to 
deserialization, but reduces the effort of network-like delivery.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...

2018-01-21 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19285
  
Thanks for reviewing. The code has updated, pls help to review. Thanks 
again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2018-01-21 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19285#discussion_r162802949
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -261,37 +263,93 @@ private[spark] class MemoryStore(
   // If this task attempt already owns more unroll memory than is 
necessary to store the
   // block, then release the extra memory that will not be used.
   val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
-  releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 
excessUnrollMemory)
+  releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory)
   transferUnrollToStorage(size)
   true
 }
   }
+
   if (enoughStorageMemory) {
 entries.synchronized {
-  entries.put(blockId, entry)
+  entries.put(blockId, createMemoryEntry())
 }
 logInfo("Block %s stored as values in memory (estimated size %s, 
free %s)".format(
   blockId, Utils.bytesToString(size), 
Utils.bytesToString(maxMemory - blocksMemoryUsed)))
 Right(size)
   } else {
 assert(currentUnrollMemoryForThisTask >= 
unrollMemoryUsedByThisBlock,
   "released too much unroll memory")
+Left(unrollMemoryUsedByThisBlock)
+  }
+} else {
+  Left(unrollMemoryUsedByThisBlock)
+}
+  }
+
+  /**
+   * Attempt to put the given block in memory store as values.
+   *
+   * It's possible that the iterator is too large to materialize and store 
in memory. To avoid
+   * OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
+   * whether there is enough free memory. If the block is successfully 
materialized, then the
+   * temporary unroll memory used during the materialization is 
"transferred" to storage memory,
+   * so we won't acquire more memory than is actually needed to store the 
block.
+   *
+   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
+   * an iterator containing the values of the block. The returned 
iterator will be backed
+   * by the combination of the partially-unrolled block and the 
remaining elements of the
+   * original input iterator. The caller must either fully consume 
this iterator or call
+   * `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
+   * block.
+   */
+  private[storage] def putIteratorAsValues[T](
+  blockId: BlockId,
+  values: Iterator[T],
+  classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = 
{
+
+// Underlying vector for unrolling the block
+var vector = new SizeTrackingVector[T]()(classTag)
+var arrayValues: Array[T] = null
+var preciseSize: Long = -1
+
+def storeValue(value: T): Unit = {
+  vector += value
+}
+
+def estimateSize(precise: Boolean): Long = {
+  if (precise) {
+// We only call need the precise size after all values unrolled.
+arrayValues = vector.toArray
+preciseSize = SizeEstimator.estimate(arrayValues)
+preciseSize
+  } else {
+vector.estimateSize()
+  }
+}
+
+def createMemoryEntry(): MemoryEntry[T] = {
+  // We successfully unrolled the entirety of this block
+  DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
+}
+
+putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, storeValue,
+  estimateSize, createMemoryEntry) match {
+  case Right(storedSize) => Right(storedSize)
+  case Left(unrollMemoryUsedByThisBlock) =>
+// We ran out of space while unrolling the values for this block
+val (unrolledIterator, size) = if (vector != null) {
--- End diff --

updated, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20026: [SPARK-22838][Core] Avoid unnecessary copying of data

2017-12-26 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/20026
  
cc @jiangxb1987 any comments on this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20026: [SPARK-22838][Core] Avoid unnecessary copying of data

2017-12-21 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/20026
  
I'll update it tomorrow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...

2017-12-21 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20026#discussion_r158279099
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -61,6 +61,7 @@ private boolean refill() throws IOException {
 nRead = fileChannel.read(byteBuffer);
   }
   if (nRead < 0) {
+byteBuffer.flip();
--- End diff --

The problem is when we reach the end of file, we call  
`NioBufferedFileInputStream.call()` twice can't get the `-1` both.
you can see here:
org.apache.commons.crypto.stream.input.StreamInput.java
```java
public int read(ByteBuffer dst) throws IOException {
int remaining = dst.remaining();
int read = 0;

while(remaining > 0) {
int n = this.in.read(this.buf, 0, Math.min(remaining, 
this.bufferSize));
if (n == -1) {
if (read == 0) {
read = -1;
}
break;
}

if (n > 0) {
dst.put(this.buf, 0, n);
read += n;
remaining -= n;
}
}

return read;
}
```
1. reading the data from the underlying `NioBufferedFileInputStream`, read 
some remaining bytes.
2. reaching the end, so this time we call the `byteBuffer.flip()`. After 
`flip()`, the `byteBuffer.hasRemaining()` changed to true, because the `flip()` 
changed the limit and position.
3. But the return of `StreamInput.read()` is larger than 0 which read from 
step 1.
4. So we will call the `StreamInput.read()` . But the 
`byteBuffer.hasRemaining()` of `NioBufferedFileInputStream` changed, so we will 
read the dirty data from the `byteBuffer`.

I'm not very sure this explanation is correct enough. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20026: [SPARK-22838][Core] Avoid unnecessary copying of data

2017-12-21 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/20026
  
It seems the error not related. And can you add me to the whitelist?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...

2017-12-21 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20026#discussion_r158243377
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -61,6 +61,7 @@ private boolean refill() throws IOException {
 nRead = fileChannel.read(byteBuffer);
   }
   if (nRead < 0) {
+byteBuffer.flip();
--- End diff --

This related to this error: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85228/testReport/org.apache.spark.storage/BlockManagerSuite/LRU_with_mixed_storage_levels_and_streams__encryption___on_/
I'm not very sure the reason, but I guess this happens such as follow:
```scala
var i = 0
while (i < inputStream.avaiable()) {
 //do something
}
```
After we arrived at the end of the file which `i == (inputStream.avaiable() 
- 1)`, then we get `-1` from `inputStream.read()`. And this time we need to 
call the `refill()` too. Even if we can't get the data from the underlying 
`fileChannel`, but the `byteBuffer` flipped. So the `inputStream.avaiable` 
changed, and we still can read the dirty data remained in the `byteBuffer`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...

2017-12-21 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20026#discussion_r158220107
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -91,7 +92,12 @@ public synchronized int read(byte[] b, int offset, int 
len) throws IOException {
 
   @Override
   public synchronized int available() throws IOException {
-return byteBuffer.remaining();
+int n = byteBuffer.remaining();
+long avail = fileChannel.size() - fileChannel.position();
+long total = avail + n;
+return avail > (Long.MAX_VALUE - n)
+  ? Integer.MAX_VALUE : total > Integer.MAX_VALUE
+  ? Integer.MAX_VALUE : (int)total;
--- End diff --

This also shoule be fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...

2017-12-21 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20026#discussion_r158220052
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -61,6 +61,7 @@ private boolean refill() throws IOException {
 nRead = fileChannel.read(byteBuffer);
   }
   if (nRead < 0) {
+byteBuffer.flip();
--- End diff --

This should be a bug.  The byteBuffer is fiped after 
`fileChannel.read(byteBuffer)`, so if then we can read the remaining  dirty 
data. The test case will be add latter. I'm not sure this is should be another 
patch. @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...

2017-12-20 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20026#discussion_r158181658
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -208,7 +209,7 @@ private class EncryptedBlockData(
 conf: SparkConf,
 key: Array[Byte]) extends BlockData {
 
-  override def toInputStream(): InputStream = 
Channels.newInputStream(open())
+  override def toInputStream(): InputStream = new 
NioBufferedFileInputStream(file)
--- End diff --

Sorry for the mistake, updated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...

2017-12-19 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20026#discussion_r157938250
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -208,7 +209,7 @@ private class EncryptedBlockData(
 conf: SparkConf,
 key: Array[Byte]) extends BlockData {
 
-  override def toInputStream(): InputStream = 
Channels.newInputStream(open())
+  override def toInputStream(): InputStream = new 
NioBufferedFileInputStream(file)
--- End diff --

You meaning the memory buffer? The `NioBufferedFileInputStream` has `close` 
method, you can see follow:
org.apache.spark.io.NioBufferedFileInputStream.java
```java
@Override
  public synchronized void close() throws IOException {
fileChannel.close();
StorageUtils.dispose(byteBuffer);
  }

  @Override
  protected void finalize() throws IOException {
close();
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20026: [SPARK-22838][Core] Avoid unnecessary copying of data

2017-12-19 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/20026
  
@cloud-fan Please take a look, thanks a lot.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...

2017-12-19 Thread ConeyLiu
GitHub user ConeyLiu opened a pull request:

https://github.com/apache/spark/pull/20026

[SPARK-22838][Core] Avoid unnecessary copying of data

## What changes were proposed in this pull request?

If we read data from FileChannel to HeapByteBuffer, there is a need to copy 
the data from the off-heap to the on-heap, you can see the follow code:

```java
static int read(FileDescriptor var0, ByteBuffer var1, long var2, 
NativeDispatcher var4) throws IOException {
if(var1.isReadOnly()) {
  throw new IllegalArgumentException("Read-only buffer");
} else if(var1 instanceof DirectBuffer) {
  return readIntoNativeBuffer(var0, var1, var2, var4);
} else {
  ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());

  int var7;
  try {
int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
var5.flip();
if(var6 > 0) {
  var1.put(var5);
}

var7 = var6;
  } finally {
Util.offerFirstTemporaryDirectBuffer(var5);
  }

  return var7;
}
  }
```

## How was this patch tested?

Existing UT.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ConeyLiu/spark datacopy

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20026.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20026


commit 27d2b1b3991602a7126d85cfc69c3da1ff84d599
Author: Xianyang Liu <xianyang.liu@...>
Date:   2017-12-20T02:32:44Z

small fix

commit 51c32c8c6cc89a3249b3ef856c41f3c238b59f4a
Author: Xianyang Liu <xianyang.liu@...>
Date:   2017-12-20T03:44:00Z

fix code style




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19735: [MINOR][CORE] Using bufferedInputStream for dataDeserial...

2017-11-13 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19735
  
thanks @jerryshao @srowen.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19735: [MINOR][CORE] Using bufferedInputStream for dataDeserial...

2017-11-12 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19735
  
@srowen Could you take a look?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19735: [MINOR][CORE] Using bufferedInputStream for dataD...

2017-11-12 Thread ConeyLiu
GitHub user ConeyLiu opened a pull request:

https://github.com/apache/spark/pull/19735

[MINOR][CORE] Using bufferedInputStream for dataDeserializeStream

## What changes were proposed in this pull request?

Small fix. Using bufferedInputStream for dataDeserializeStream.

## How was this patch tested?

Existing UT.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ConeyLiu/spark smallfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19735.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19735


commit 9fe6426c61391f1e913be4abecba8b4f87785c5c
Author: Xianyang Liu <xianyang@intel.com>
Date:   2017-11-13T05:15:04Z

using bufferdinputstream




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19661: [SPARK-22450][Core][Mllib]safely register class for mlli...

2017-11-10 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19661
  
thanks everyone.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19661: [SPARK-22450][Core][Mllib]safely register class f...

2017-11-10 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19661#discussion_r150173067
  
--- Diff: 
core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---
@@ -108,6 +108,27 @@ class KryoSerializerSuite extends SparkFunSuite with 
SharedSparkContext {
 check(Array(Array("1", "2"), Array("1", "2", "3", "4")))
   }
 
+  test("safely register class for mllib/ml") {
+val conf = new SparkConf(false)
+val ser = new KryoSerializer(conf)
+
+Seq("org.apache.spark.mllib.linalg.Vector",
+  "org.apache.spark.mllib.linalg.DenseVector",
+  "org.apache.spark.mllib.linalg.SparseVector",
+  "org.apache.spark.mllib.linalg.Matrix",
+  "org.apache.spark.mllib.linalg.DenseMatrix",
+  "org.apache.spark.mllib.linalg.SparseMatrix",
+  "org.apache.spark.ml.linalg.Vector",
+  "org.apache.spark.ml.linalg.DenseVector",
+  "org.apache.spark.ml.linalg.SparseVector",
+  "org.apache.spark.ml.linalg.Matrix",
+  "org.apache.spark.ml.linalg.DenseMatrix",
+  "org.apache.spark.ml.linalg.SparseMatrix",
+  "org.apache.spark.ml.feature.Instance",
+  "org.apache.spark.ml.feature.OffsetInstance"
+).foreach(!Utils.classIsLoadable(_))
--- End diff --

Ok, remove it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19661: [SPARK-22450][Core][Mllib]safely register class f...

2017-11-10 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19661#discussion_r150172493
  
--- Diff: 
core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---
@@ -108,6 +108,27 @@ class KryoSerializerSuite extends SparkFunSuite with 
SharedSparkContext {
 check(Array(Array("1", "2"), Array("1", "2", "3", "4")))
   }
 
+  test("safely register class for mllib/ml") {
+val conf = new SparkConf(false)
+val ser = new KryoSerializer(conf)
+
+Seq("org.apache.spark.mllib.linalg.Vector",
+  "org.apache.spark.mllib.linalg.DenseVector",
+  "org.apache.spark.mllib.linalg.SparseVector",
+  "org.apache.spark.mllib.linalg.Matrix",
+  "org.apache.spark.mllib.linalg.DenseMatrix",
+  "org.apache.spark.mllib.linalg.SparseMatrix",
+  "org.apache.spark.ml.linalg.Vector",
+  "org.apache.spark.ml.linalg.DenseVector",
+  "org.apache.spark.ml.linalg.SparseVector",
+  "org.apache.spark.ml.linalg.Matrix",
+  "org.apache.spark.ml.linalg.DenseMatrix",
+  "org.apache.spark.ml.linalg.SparseMatrix",
+  "org.apache.spark.ml.feature.Instance",
+  "org.apache.spark.ml.feature.OffsetInstance"
+).foreach(!Utils.classIsLoadable(_))
--- End diff --

This just want to indicate we didn't introduce extra jar dependency. I can 
delete it if it's unnecessary.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19661: [SPARK-22450][Core][Mllib]safely register class f...

2017-11-09 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19661#discussion_r150157116
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -178,6 +179,28 @@ class KryoSerializer(conf: SparkConf)
 
kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
 kryo.register(classOf[ArrayBuffer[Any]])
 
+// We can't load those class directly in order to avoid unnecessary 
jar dependencies.
+// We load them safely, ignore it if the class not found.
+Seq("org.apache.spark.mllib.linalg.Vector",
+  "org.apache.spark.mllib.linalg.DenseVector",
+  "org.apache.spark.mllib.linalg.SparseVector",
+  "org.apache.spark.mllib.linalg.Matrix",
+  "org.apache.spark.mllib.linalg.DenseMatrix",
+  "org.apache.spark.mllib.linalg.SparseMatrix",
+  "org.apache.spark.ml.linalg.Vector",
+  "org.apache.spark.ml.linalg.DenseVector",
+  "org.apache.spark.ml.linalg.SparseVector",
+  "org.apache.spark.ml.linalg.Matrix",
+  "org.apache.spark.ml.linalg.DenseMatrix",
+  "org.apache.spark.ml.linalg.SparseMatrix",
+  "org.apache.spark.ml.feature.Instance",
+  "org.apache.spark.ml.feature.OffsetInstance"
+).map(name => Try(Utils.classForName(name))).foreach { t =>
--- End diff --

updated. thanks for the advice.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19661: [SPARK-22450][Core][Mllib]safely register class f...

2017-11-08 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19661#discussion_r149846210
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -178,10 +178,40 @@ class KryoSerializer(conf: SparkConf)
 
kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
 kryo.register(classOf[ArrayBuffer[Any]])
 
+// We can't load those class directly in order to avoid unnecessary 
jar dependencies.
+// We load them safely, ignore it if the class not found.
+Seq("org.apache.spark.mllib.linalg.Vector",
+  "org.apache.spark.mllib.linalg.DenseVector",
+  "org.apache.spark.mllib.linalg.SparseVector",
+  "org.apache.spark.mllib.linalg.Matrix",
+  "org.apache.spark.mllib.linalg.DenseMatrix",
+  "org.apache.spark.mllib.linalg.SparseMatrix",
+  "org.apache.spark.ml.linalg.Vector",
+  "org.apache.spark.ml.linalg.DenseVector",
+  "org.apache.spark.ml.linalg.SparseVector",
+  "org.apache.spark.ml.linalg.Matrix",
+  "org.apache.spark.ml.linalg.DenseMatrix",
+  "org.apache.spark.ml.linalg.SparseMatrix",
+  "org.apache.spark.ml.feature.Instance",
+  "org.apache.spark.ml.feature.OffsetInstance"
+).flatMap(safeClassLoader(_)).foreach(kryo.register(_))
--- End diff --

thanks a lot, I updated the code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19661: [SPARK-22450][Core][Mllib]safely register class f...

2017-11-07 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19661#discussion_r149553694
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -178,10 +178,40 @@ class KryoSerializer(conf: SparkConf)
 
kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
 kryo.register(classOf[ArrayBuffer[Any]])
 
+// We can't load those class directly in order to avoid unnecessary 
jar dependencies.
+// We load them safely, ignore it if the class not found.
+Seq("org.apache.spark.mllib.linalg.Vector",
+  "org.apache.spark.mllib.linalg.DenseVector",
+  "org.apache.spark.mllib.linalg.SparseVector",
+  "org.apache.spark.mllib.linalg.Matrix",
+  "org.apache.spark.mllib.linalg.DenseMatrix",
+  "org.apache.spark.mllib.linalg.SparseMatrix",
+  "org.apache.spark.ml.linalg.Vector",
+  "org.apache.spark.ml.linalg.DenseVector",
+  "org.apache.spark.ml.linalg.SparseVector",
+  "org.apache.spark.ml.linalg.Matrix",
+  "org.apache.spark.ml.linalg.DenseMatrix",
+  "org.apache.spark.ml.linalg.SparseMatrix",
+  "org.apache.spark.ml.feature.Instance",
+  "org.apache.spark.ml.feature.OffsetInstance"
+).flatMap(safeClassLoader(_)).foreach(kryo.register(_))
--- End diff --

Hi @cloud-fan , I tried the following code:
```scala
flatMap(cn => 
Try{Utils.classForName(cn)}.toOption).foreach(kryo.register(_))
```
and 
```scala
flatMap{ cn =>
  try {
val clazz = Utils.classForName(cn)
Some(clazz)
  } catch {
case _: ClassNotFoundException => None
  }
}.foreach(kryo.register(_))
```

Both reported the same errors:
```
Error:(198, 18) type mismatch;
 found   : String => Iterable[Class[_$2]]( forSome { type _$2 })
 required: String => scala.collection.GenTraversableOnce[B]
).flatMap{cn => 
Option(Utils.classForName(cn))}.foreach(kryo.register(_))
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...

2017-11-06 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19285
  
It's updated. Thanks a lot.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

2017-11-06 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/17936
  
OK, thanks a lot.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19661: [SPARK-22450][Core][Mllib]safely register class for mlli...

2017-11-06 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19661
  
Thanks for reviewing. The code is updated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19661: [SPARK-22450][Core][Mllib]safely register class for mlli...

2017-11-05 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19661
  
>So why don't you include some classes such as 
org.apache.spark.ml.feature.Instance ?
I'm not family with those algorithm, I can add them such as 
`org.apache.spark.ml.feature.Instance` .

If this method is not reasonable, we maybe just reminder user to register 
them in the doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19586: [SPARK-22367][WIP][CORE] Separate the serialization of c...

2017-11-05 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19586
  
Thanks for the suggestion, I re-raised a pr to solve this problem. Close it 
now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19586: [SPARK-22367][WIP][CORE] Separate the serializati...

2017-11-05 Thread ConeyLiu
Github user ConeyLiu closed the pull request at:

https://github.com/apache/spark/pull/19586


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19661: [SPARK-22450][Core][Mllib]safely register class for mlli...

2017-11-05 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19661
  
#19586 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19661: [SPARK-22450][Core][Mllib]safely register class f...

2017-11-05 Thread ConeyLiu
GitHub user ConeyLiu opened a pull request:

https://github.com/apache/spark/pull/19661

[SPARK-22450][Core][Mllib]safely register class for mllib

## What changes were proposed in this pull request?

There are still some algorithms based on mllib, such as KMeans. For now, 
many mllib common class (such as: Vector, DenseVector, SparseVector, Matrix, 
DenseMatrix, SparseMatrix) are not registered in Kryo. So there are some 
performance issues for those object serialization or deserialization.
Previously dicussed: https://github.com/apache/spark/pull/19586

## How was this patch tested?

New test case.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ConeyLiu/spark register_vector

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19661.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19661


commit 317229fabd7b470c8c349a4f12604ea22af0d27f
Author: Xianyang Liu <xianyang@intel.com>
Date:   2017-11-06T02:40:55Z

safely register class for mllib




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19586: [SPARK-22367][WIP][CORE] Separate the serialization of c...

2017-11-03 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19586
  
Hi @cloud-fan, @jerryshao. The problem of `writeClass` and `readClass` can 
be solved by register the class: Vector, DenseVector, SparseVector.  The follow 
is the test results:
```scala
val conf = new SparkConf().setAppName("Vector Register Test")
conf.registerKryoClasses(Array(classOf[Vector], classOf[DenseVector], 
classOf[SparseVector]))
val sc = new SparkContext(conf)

val sourceData = sc.sequenceFile[LongWritable, VectorWritable](args(0))
  .map { case (k, v) =>
val vector = v.get()
val tmpVector = new Array[Double](v.get().size())
for (i <- 0 until vector.size()) {
  tmpVector(i) = vector.get(i)
}
Vectors.dense(tmpVector)
  }

sourceData.persist(StorageLevel.OFF_HEAP)
var start = System.currentTimeMillis()
sourceData.count()
println("First: " + (System.currentTimeMillis() - start))
start = System.currentTimeMillis()
sourceData.count()
println("Second: " + (System.currentTimeMillis() - start))

sc.stop()
```


Results:
serialized size:  before 38.4GB after: 30.5GB
First time: before 93318msafter:  80708ms
Second time:  before: 5870msafter: 3382ms

Those classes are very common for ML,  and also `Matrix`, `DenseMatrix` and 
`SparseMatrix` too. I'm not sure whether we should register those classes in 
core directly,  because this could introduce extra jar dependency.  So could 
you give some advice? Or else we just remind in the ml doc?

The reason shoule be the problem of kryo, it  will write the full class 
name instead of the classID if the class is not registered.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19586: [SPARK-22367][WIP][CORE] Separate the serialization of c...

2017-11-02 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19586
  
OK, I can understand your concern. There is huge gc problem for K-means 
workload, it occupied about 10-20% percent. The source data is cached in 
memory, there is even worse performance when the source data can't be cached in 
memory. So we try the source data to off-heap. However, the training time even 
worse after using the off-heap memory.  Because the gc only occupied about 
10-20% with on-heap memory, while deserialization occupied about 30-40% with 
off-heap memory even if the gc problem solved.  
https://user-images.githubusercontent.com/12733256/32313752-5dbec220-bfdf-11e7-8b49-d5daa47cd50f.PNG;>

https://user-images.githubusercontent.com/12733256/32313788-824b8470-bfdf-11e7-9b59-aea26e9c6c0a.PNG;>

You can see the pic, the `readClass` occupied about 13% .  So I opened this 
pr.  With this path test result, the total time (loading data + training kmeans 
model) saved about 10% time.  The above picture is only about training phase, 
not include the loading source data phase,  so the improvement should be larger 
as we expected. And I plan to optimize the `readObjectOrNull` after this.

Also, I found the `Vector` is not registered, so I will test the 
performance with the registered vector. This maybe can reduce the cpu occupied, 
but can't save the serialized memory.
 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19586: [SPARK-22367][WIP][CORE] Separate the serialization of c...

2017-11-01 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19586
  
Hi @cloud-fan, for most case the data type should be same. So I think this 
optimization is valuable, because it can save the space and cpu resource 
considerable. What about setting a flag for the RDD, which indicates whether 
the RDD only has the same types. If it'st not valid, could we putting it to the 
ml package for special serializer, then user could configure it. But for this 
case, there must be provided the exactly classtag of the RDD for serialization 
due to the relocation of unsafeshufflewrite.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19586: [SPARK-22367][WIP][CORE] Separate the serialization of c...

2017-11-01 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19586
  
Currently, I use it directly. Maybe this is suitable for some special case 
which has same type data, such as ml or else. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19586: [SPARK-22367][WIP][CORE] Separate the serialization of c...

2017-10-31 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19586
  
Hi @jerryshao, Thanks for the reminder, it doesn't support it. I'm sorry I 
did not take that into account.  How about using configuration to determine 
whether we should use `SerializerInstance#serializeStreamForClass[T]`. For most 
case the data type should be same.

Can you give some advice? Also cc @cloud-fan @srowen 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19586: [SPARK-22367][WIP][CORE] Separate the serialization of c...

2017-10-30 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19586
  
Hi @cloud-fan, thanks for reviewing. There are some errors about 
`UnsafeShuffleWrite` need further fixed. I am not familiar with this code, so I 
need some time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19586: [SPARK-22367][WIP][CORE] Separate the serializati...

2017-10-30 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19586#discussion_r147709649
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -205,11 +205,45 @@ class KryoSerializationStream(
 
   private[this] var kryo: Kryo = serInstance.borrowKryo()
 
+  // This is only used when we write object and class separately.
+  var classWrote = false
+
   override def writeObject[T: ClassTag](t: T): SerializationStream = {
 kryo.writeClassAndObject(output, t)
--- End diff --

From the code, it just write a `varInt` if the class have been registered. 
And also there need some calculation for getting the `varInt`. But from the 
test, the overhead looks more serious than I expected.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19586: [SPARK-22367][CORE] Separate the serialization of...

2017-10-27 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19586#discussion_r147371400
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -376,7 +382,17 @@ private[spark] class MemoryStore(
 
 // Unroll this block safely, checking whether we have exceeded our 
threshold
 while (values.hasNext && keepUnrolling) {
-  serializationStream.writeObject(values.next())(classTag)
+  val value = values.next()
+  if (kryoSerializationStream != null) {
+if (!kryoSerializationStream.classWrote) {
+  kryoSerializationStream.writeClass(value.getClass)
--- End diff --

@srowen you can see here. Here we don't use the writeAll, because we need 
acquire memory according to the written size.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19586: [SPARK-22367][CORE] Separate the serialization of class ...

2017-10-27 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19586
  
@srowen Thanks for the reviewing. 
What do you meaning here?
> I'm trying to think if there's any case where we intend to support 
kryo/java serialized objects from 2.x in 2.y.

After you registered. It still writes the class (not class full name but 
just a class ID) if you call `writeObjectAndClass`. In order to get the class 
id, there is also need some calculation. And then write the class ID and object.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19586: [SPARK-22367][CORE] Separate the serialization of...

2017-10-27 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19586#discussion_r147368368
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -860,9 +876,26 @@ private[storage] class PartiallySerializedBlock[T](
 ByteStreams.copy(unrolledBuffer.toInputStream(dispose = true), os)
 memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory)
 redirectableOutputStream.setOutputStream(os)
+
+// Whether we use Kryo serialization
+var kryoSerializationStream: KryoSerializationStream = null
+if (serializationStream.isInstanceOf[KryoSerializationStream]) {
+  kryoSerializationStream = 
serializationStream.asInstanceOf[KryoSerializationStream]
+}
+
 while (rest.hasNext) {
-  serializationStream.writeObject(rest.next())(classTag)
+  val value = rest.next()
+  if (kryoSerializationStream != null) {
+if (!kryoSerializationStream.classWrote) {
+  kryoSerializationStream.writeClass(value.getClass)
--- End diff --

@srowen you can see here. Here we don't use the writeAll, because we need 
acquire memory according to the written size.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19586: [SPARK-22367][CORE] Separate the serialization of...

2017-10-27 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19586#discussion_r147368002
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -205,11 +205,45 @@ class KryoSerializationStream(
 
   private[this] var kryo: Kryo = serInstance.borrowKryo()
 
+  // This is only used when we write object and class separately.
+  var classWrote = false
--- End diff --

Yeah, it used for `writeAll / asIterator`.  But for 
`MemoryStorea.putIteratorAsBytes`, we don't use the writeAll, we use this state 
to indicate whether we have written the class first.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19586: [SPARK-22367][CORE] Separate the serialization of...

2017-10-27 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19586#discussion_r147367241
  
--- Diff: pom.xml ---
@@ -133,7 +133,7 @@
 1.6.0
 9.3.20.v20170531
 3.1.0
-0.8.4
+0.9.2
--- End diff --

Not necessary. Chill 0.9.2 uses kryo 4.0. I can change it back.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19586: [SPARK-22367][CORE] Separate the serialization of class ...

2017-10-27 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19586
  
One executor, the configuration as follows:
the script:
```shell
${SPARK_HOME}/bin/spark-submit \
--class com.intel.KryoTest  \
--master yarn   \
--deploy-mode  cluster   \
--conf spark.memory.offHeap.enabled=true   \
--conf spark.memory.offHeap.size=50g   \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer  
\
--driver-memory 5G \
--driver-cores  10\
--executor-memory  40G  \
--executor-cores  20\
--num-executors 1   \

```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19586: [SPARK-22367][CORE] Separate the serialization of...

2017-10-27 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19586#discussion_r147346131
  
--- Diff: pom.xml ---
@@ -133,7 +133,7 @@
 1.6.0
 9.3.20.v20170531
 3.1.0
-0.8.4
+0.9.2
--- End diff --

I am not sure whether this is should be changed. If it is unreasonable, I 
can change it back.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19586: [SPARK-22367][CORE] Separate the serialization of class ...

2017-10-27 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19586
  
Hi, @cloud-fan @jiangxb1987 @chenghao-intel. Would you mind take a look? 
Thanks a lot. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19586: [SPARK-22367][CORE] Separate the serialization of...

2017-10-27 Thread ConeyLiu
GitHub user ConeyLiu opened a pull request:

https://github.com/apache/spark/pull/19586

[SPARK-22367][CORE] Separate the serialization of class and object for 
iteraor

## What changes were proposed in this pull request?

Becuase they are all the same class for an iterator.  So there is no need 
write class information for every record in the iterator. We only need write 
the class information once at the serialization beginning, also only need read 
the class information once for deserialization.

In this patch, we separate the serialization of class and object for an 
iterator serialized by Kryo. This can improve the performance of the 
serialization and deserialization, and save the space.

Test case:
```scala
val conf = new SparkConf().setAppName("Test for serialization")
val sc = new SparkContext(conf)

val random = new Random(1)
val data = sc.parallelize(1 to 10).map { i =>
  Person("id-" + i, random.nextInt(Integer.MAX_VALUE))
}.persist(StorageLevel.OFF_HEAP)

var start = System.currentTimeMillis()
data.count()
println("First time: " + (System.currentTimeMillis() - start))

start = System.currentTimeMillis()
data.count()
println("Second time: " + (System.currentTimeMillis() - start))

```

Test result:

The size of serialized:
before: 34.3GB
after: 17.5GB

| before(cal+serialization)| before(deserialization)| 
after(cal+serialization)| after(deserialization) |
| --| -- | -- | -- | 
| 63869| 21882|  45513| 15158|
| 59368| 21507|  51683| 15524|
| 66230| 21481|  62163| 14903|
| 62399| 22529|  52400| 16255|

| 137564.2 | 136990.8 | 1.004186 | 

## How was this patch tested?

Existing UT.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ConeyLiu/spark kryo

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19586.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19586


commit c681e81f9d49b3558c91a3b981504159bbeff910
Author: Xianyang Liu <xianyang@intel.com>
Date:   2017-10-26T06:37:04Z

serialize object and class seperately for iterator

commit 640ad5e1d12d1137f4c979a1e75dbdbd713e14de
Author: Xianyang Liu <xianyang@intel.com>
Date:   2017-10-26T06:42:58Z

Merge remote-tracking branch 'spark/master' into kryo




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19511: [SPARK-22293][SQL] Avoid unnecessary traversal in...

2017-10-26 Thread ConeyLiu
Github user ConeyLiu closed the pull request at:

https://github.com/apache/spark/pull/19511


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19511: [SPARK-22293][SQL] Avoid unnecessary traversal in Resolv...

2017-10-17 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19511
  
OK, close it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...

2017-10-17 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19317#discussion_r145294297
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
---
@@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* as in scala.TraversableOnce. The former operation is used for merging 
values within a
* partition, and the latter is used for merging values between 
partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return 
their first argument
+   * instead of creating a new U. This method is different from the 
ordinary "aggregateByKey"
+   * method, it directly returns a map to the driver, rather than a rdd. 
This will also perform
+   * the merging locally on each mapper before sending results to a 
reducer, similarly to a
--- End diff --

thansk for the advance, I'll close it and try `mapPartitions(...).collect` 
in `NaiveBayes`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...

2017-10-17 Thread ConeyLiu
Github user ConeyLiu closed the pull request at:

https://github.com/apache/spark/pull/19317


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19511: [SPARK-22293][SQL] Avoid unnecessary traversal in Resolv...

2017-10-17 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19511
  
Hi @gatorsmile, if we can combine the two traverse, this should be simplify 
the code not complicate. However, this can't get big performance improvement. 
And I can close it if this change unnecessary.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19511: [SPARK-22293][SQL] Avoid unnecessary traversal in...

2017-10-17 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19511#discussion_r145041400
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -890,32 +890,39 @@ class Analyzer(
 
 /**
  * Returns true if `exprs` contains a [[Star]].
+ * @param deepInto Whether to traverse all the subtrees, true by 
default.
  */
-def containsStar(exprs: Seq[Expression]): Boolean =
-  exprs.exists(_.collect { case _: Star => true }.nonEmpty)
+def containsStar(exprs: Seq[Expression], deepInto: Boolean = true): 
Boolean = {
+  if (deepInto) {
+exprs.exists(_.collect { case _: Star => true }.nonEmpty)
+  } else {
+exprs.exists{ case _: Star => true}
+  }
+}
+
 
 /**
  * Expands the matching attribute.*'s in `child`'s output.
  */
 def expandStarExpression(expr: Expression, child: LogicalPlan): 
Expression = {
   expr.transformUp {
-case f1: UnresolvedFunction if containsStar(f1.children) =>
+case f1: UnresolvedFunction if containsStar(f1.children, false) =>
--- End diff --

And aslo I have question: whether we could combine the two traverse into 
one ? Currently, we first need travese the `children` to test whether there is 
a `Star`, and then we traverse another one to expand it. We have to go through 
at least once, and need twice for existed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19511: [SPARK-22293][SQL] Avoid unnecessary traversal in Resolv...

2017-10-17 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19511
  
Hi, @cloud-fan @gatorsmile. Would you mind take a look? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19511: [SPARK-22293][SQL] Avoid unnecessary traversal in...

2017-10-17 Thread ConeyLiu
GitHub user ConeyLiu opened a pull request:

https://github.com/apache/spark/pull/19511

[SPARK-22293][SQL] Avoid unnecessary traversal in ResolveReferences

## What changes were proposed in this pull request?

We don't need traverse the children expression to determine whether there 
is an `Star` when expand `Star` expression.

## How was this patch tested?
Existing UT.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ConeyLiu/spark resolveReferences

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19511.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19511


commit c58b3c0da49b4660affd386bae37fa6b5c9b3567
Author: Xianyang Liu <xianyang@intel.com>
Date:   2017-10-17T06:29:02Z

Avoid unnecessary traversal




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...

2017-10-16 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19317#discussion_r144760426
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
---
@@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* as in scala.TraversableOnce. The former operation is used for merging 
values within a
* partition, and the latter is used for merging values between 
partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return 
their first argument
+   * instead of creating a new U. This method is different from the 
ordinary "aggregateByKey"
+   * method, it directly returns a map to the driver, rather than a rdd. 
This will also perform
+   * the merging locally on each mapper before sending results to a 
reducer, similarly to a
--- End diff --

`aggregateByKey(...).toLocalIterator` need a shuffle for `aggregateByKey` 
and then collect the `RDD` to driver as a iterator. But `aggregateByKeyLocally` 
seems like the `aggregateByKey`, while there isn't a shuffle. It calculates the 
combines in each task and then collect all the `map` direcly to driver and do 
the finally combines on driver.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...

2017-10-15 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19317#discussion_r144755666
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
---
@@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* as in scala.TraversableOnce. The former operation is used for merging 
values within a
* partition, and the latter is used for merging values between 
partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return 
their first argument
+   * instead of creating a new U. This method is different from the 
ordinary "aggregateByKey"
+   * method, it directly returns a map to the driver, rather than a rdd. 
This will also perform
+   * the merging locally on each mapper before sending results to a 
reducer, similarly to a
--- End diff --

Yeah, it will. Here the 'difference' means it directly returns a map to the 
driver rather than an RDD.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19317: [SPARK-22098][CORE] Add new method aggregateByKeyLocally...

2017-10-15 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19317
  
Hi @WeichenXu123, any comments on this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19316: [SPARK-22097][CORE]Request an accurate memory aft...

2017-10-11 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19316#discussion_r144169752
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -388,7 +388,13 @@ private[spark] class MemoryStore(
 // perform one final call to attempt to allocate additional memory if 
necessary.
 if (keepUnrolling) {
   serializationStream.close()
-  reserveAdditionalMemoryIfNecessary()
+  if (bbos.size > unrollMemoryUsedByThisBlock) {
+val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
--- End diff --

Because we request `val amountToRequest = (bbos.size * memoryGrowthFactor - 
unrollMemoryUsedByThisBlock).toLong` in `reserveAdditionalMemoryIfNecessary ` 
to avoid requesting for every records.  But here we just need request the 
precise memory for the last request.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19316: [SPARK-22097][CORE]Request an accurate memory aft...

2017-10-10 Thread ConeyLiu
Github user ConeyLiu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19316#discussion_r143901552
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -388,7 +388,13 @@ private[spark] class MemoryStore(
 // perform one final call to attempt to allocate additional memory if 
necessary.
 if (keepUnrolling) {
--- End diff --

We maybe need request the extra-memory (`bbos.size - 
unrollMemoryUsedByThisBlock`). So the `keepUnrolling` maybe change in line 393.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19316: [SPARK-22097][CORE]Request an accurate memory after we u...

2017-10-10 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19316
  
Hi @cloud-fan @jiangxb1987 Do you have time to check this? Thanks a lot.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19317: [SPARK-22098][CORE] Add new method aggregateByKeyLocally...

2017-09-24 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19317
  
Test case:
```scala
test("performance of aggregateByKeyLocally ") {
val random = new Random(1)

val pairs = sc.parallelize(0 until 1000, 20)
  .map(p => (random.nextInt(100), p))
  .persist(StorageLevel.MEMORY_ONLY)

pairs.count()

val start = System.currentTimeMillis()
//val jHashMap = pairs.aggregateByKeyLocallyWithJHashMap(new 
HashSet[Int]())(_ += _, _ ++= _).toArray
val openHashMap = pairs.aggregateByKeyLocally(new HashSet[Int]())(_ += 
_, _ ++= _).toArray
println(System.currentTimeMillis() - start)
  }
```

Test result:
| map| 1| 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | avg |
| --| -- | -- |--| -- | -- |--| -- | -- 
|--| -- | -- |
| JHashMap | 2921 | 2920 | 2843 | 2950 | 2898 | 3316 | 2770 | 2994 | 3016 | 
3005 | 2963.3 |
| OpenHashMap | 3029 | 2884 | 3064 | 3023 | 3108 | 3194 | 3003 | 2961 | 
3115 | 3023 | 3040.4 |

Looks almost the same performance.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19317: [SPARK-22098][CORE] Add new method aggregateByKeyLocally...

2017-09-24 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19317
  
OK, just keep it. Does this need more test or more improvements ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   >