[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/22371 OK, thanks everyone for the help. Close it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...
Github user ConeyLiu closed the pull request at: https://github.com/apache/spark/pull/22371 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/22371 @squito , thanks for the review. I intend to using `ConcurrentHashMap[Int, AtomicReferenceArray]` previously. After re-think the code, I can know the lock here is used to prevent the same task with different attempt to commit the shuffle writer result at the same time. The task has a different attempt can be caused by follows: 1. Failed task or stage. In this case, the previous task attempt should already finish(failed or killed) or the result is not used anymore. 2. `Speculative task`. In this case, the speculative task can't be scheduled to the same executor as other attempts. So, what's real value for the lock. Maybe I'm wrong, hopeful some answers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/22371#discussion_r216304910 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -51,6 +52,8 @@ private[spark] class IndexShuffleBlockResolver( private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle") + private val shuffleIdToLocks = new ConcurrentHashMap[Int, Array[Object]]() --- End diff -- Seems `Object` can't put into `Array[_]` directly, maybe need some casts. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/22371 Thanks @felixcheung, @srowen, @cloud-fan for your time. There is only one instance of `IndexShuffleBlockResolver` per executor, and the synchronize is used to protect the modify safely when there are same tasks with different attempt update at the same time. The synchronize is unnecessary for most of the tasks, and the modify is very simple. I have tested locally, the results as follow. I admit that this change brings little improvement to complex tasks, but it does not cause performance degradation. `./spark-shell --master local[20] --driver-memory 40g` `spark.range(0, 1000, 1, 100).repartition(200).count()` before: map | reduce | --- 2s | 0.4s 0.8s | 0.2s 0.7s | 0.2s after: map | reduce | --- 0.8s | 0.2s 0.5s | 0.4s 0.5s | 0.2s --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/22371 @cloud-fan @jiangxb1987 Could you help to review this? Thanks a lot. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...
GitHub user ConeyLiu opened a pull request: https://github.com/apache/spark/pull/22371 [SPARK-25386][CORE] Don't need to synchronize the IndexShuffleBlockResolver for each writeIndexFileAndCommit ## What changes were proposed in this pull request? Now, we need synchronize the instance of IndexShuffleBlockResolver in order to make the commit check and tmp file rename atomically. This can be improved. We could synchronize a lock which is different for each `shuffleId + mapId` instead of synchronize the indexShuffleBlockResolver for each writeIndexFileAndCommit. This should be an optimization with space for time, but it doesn't take up a lot of space. ## How was this patch tested? Existing UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ConeyLiu/spark indexShuffleBlockResolver Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22371.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22371 commit 92c2e07171f60b977c62661ea6475486a1599b19 Author: Xianyang Liu Date: 2018-09-09T10:44:23Z don't need synchronized the IndexShuffleBlockResolver for each writeIndexFileAndCommit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20844: [SPARK-23707][SQL] Don't need shuffle exchange wi...
Github user ConeyLiu closed the pull request at: https://github.com/apache/spark/pull/20844 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20844: [SPARK-23707][SQL] Don't need shuffle exchange with sing...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/20844 thanks for all. Closes it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20844: [SPARK-23707][SQL] Don't need shuffle exchange wi...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/20844#discussion_r176327636 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -348,6 +348,13 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + /** Specifies how data is partitioned across different nodes in the cluster. */ + override def outputPartitioning: Partitioning = if (numSlices == 1 && numElements != 0) { --- End diff -- This related to the [UT error](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88474/testReport/org.apache.spark.sql/DataFrameRangeSuite/SPARK_7150_range_api/). `spark.range(-10, -9, -20, 1).count()` faild when `codegen` set to true and `RangeExec.outputPartitioning' set to `SinglePartition`. I try to found the root reason, but failed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20844: [SPARK-23707][SQL] Don't need shuffle exchange with sing...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/20844 This change is very simple, and just make it consistent with other `LeafNode`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20844: [SPARK-23707][SQL] No shuffle exchange with single parti...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/20844 @cloud-fan, pls take a look, thanks a lot. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20844: [SPARK-23707][SQL] Fresh 'initRange' name to avoi...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/20844#discussion_r175966108 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -396,9 +396,11 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) // The default size of a batch, which must be positive integer val batchSize = 1000 -val initRangeFuncName = ctx.addNewFunction("initRange", +val initRange = ctx.freshName("initRange") + +val initRangeFuncName = ctx.addNewFunction(initRange, s""" -| private void initRange(int idx) { +| private void ${initRange}(int idx) { --- End diff -- Thanks for your suggestion, let me take a try. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20844: [SPARK-23707][SQL] Fresh 'initRange' name to avoi...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/20844#discussion_r175658889 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -396,9 +396,11 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) // The default size of a batch, which must be positive integer val batchSize = 1000 -val initRangeFuncName = ctx.addNewFunction("initRange", +val initRange = ctx.freshName("initRange") + +val initRangeFuncName = ctx.addNewFunction(initRange, s""" -| private void initRange(int idx) { +| private void ${initRange}(int idx) { --- End diff -- Hi @cloud-fan , before adding the comments, I have a question about why we still need `exchange ` if we join two `spark.range(1, 10, 1, 1)`. Because of both of the `range` are only one partition, does the `exchange` really needed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20844: [SPARK-23707][SQL] Fresh 'initRange' name to avoi...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/20844#discussion_r175634287 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -396,9 +396,11 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) // The default size of a batch, which must be positive integer val batchSize = 1000 -val initRangeFuncName = ctx.addNewFunction("initRange", +val initRange = ctx.freshName("initRange") + +val initRangeFuncName = ctx.addNewFunction(initRange, s""" -| private void initRange(int idx) { +| private void ${initRange}(int idx) { --- End diff -- OK, I can just some comments and keep the code unchanged. I changed it here just for better code robustness. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20844: [SPARK-23707][SQL] Fresh 'initRange' name to avoi...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/20844#discussion_r175315224 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -396,9 +396,11 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) // The default size of a batch, which must be positive integer val batchSize = 1000 -val initRangeFuncName = ctx.addNewFunction("initRange", +val initRange = ctx.freshName("initRange") + +val initRangeFuncName = ctx.addNewFunction(initRange, s""" -| private void initRange(int idx) { +| private void ${initRange}(int idx) { --- End diff -- @cloud-fan thanks for reviewing. Both `BroadCastExchange` and `ShuffleExchange` don't support `CodegenSupport`, so there should be two `WholeStageCodegen`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20844: [SPARK-23707][SQL] Fresh 'initRange' name to avoid metho...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/20844 @cloud-fan pls take a look, this is a small change. Thanks a lot. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20844: [SPARK-23707][SQL] Fresh 'initRange' name to avoi...
GitHub user ConeyLiu opened a pull request: https://github.com/apache/spark/pull/20844 [SPARK-23707][SQL] Fresh 'initRange' name to avoid method name conflicts ## What changes were proposed in this pull request? We should call `ctx.freshName` to get the `initRange` to avoid name conflicts. ## How was this patch tested? Existing UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ConeyLiu/spark range Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20844.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20844 commit d69f32ca49e3b2fa730d0520f48403eeebce60e4 Author: Xianyang Liu <xianyang.liu@...> Date: 2018-03-16T07:56:52Z Fresh 'initRange' name to avoid method name conflicts --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20676: [SPARK-23516][CORE] It is unnecessary to transfer unroll...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/20676 Yeah, I see that. I'm not sure it's OK to change. But I think we should follow the interface design, not the underlying implementation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20676: [SPARK-23516][CORE] It is unnecessary to transfer...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/20676#discussion_r171115071 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -246,18 +246,18 @@ private[spark] class MemoryStore( val amountToRequest = size - unrollMemoryUsedByThisBlock keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest + unrollMemoryUsedByThisBlock = size } + } else if (size < unrollMemoryUsedByThisBlock) { --- End diff -- In #19285, we first release `unrollMemoryUsedByThisBlock` unroll memory, and then we request `entry.size` storage memory. So, there is no waste of resources here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20676: [SPARK-23516][CORE] It is unnecessary to transfer unroll...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/20676 This is for compatibility reasons. The memory management also support legacy memory management (`StaticMemoryManager`). In `StaticMemoryManager`, the storage memory and unroll memory is managed separately. So we can't say >In fact, unroll memory is also storage memory --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20461: [SPARK-23289][CORE]OneForOneBlockFetcher.Download...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/20461#discussion_r165246022 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java --- @@ -171,7 +171,9 @@ private void failRemainingBlocks(String[] failedBlockIds, Throwable e) { @Override public void onData(String streamId, ByteBuffer buf) throws IOException { - channel.write(buf); + while (buf.hasRemaining()) { +channel.write(buf); --- End diff -- [FileSuite.writeBinaryData](https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/FileSuite.scala#L247) this also should be fixed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19285 thanks all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163768689 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -232,78 +236,93 @@ private[spark] class MemoryStore( elementsUnrolled += 1 } +val valuesBuilder = if (keepUnrolling) { + Some(valuesHolder.getBuilder()) +} else { + None +} + +// Make sure that we have enough memory to store the block. By this point, it is possible that +// the block's actual memory usage has exceeded the unroll memory by a small amount, so we +// perform one final call to attempt to allocate additional memory if necessary. if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size - def transferUnrollToStorage(amount: Long): Unit = { -// Synchronize so that transfer is atomic -memoryManager.synchronized { - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount) - val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) - assert(success, "transferring unroll memory to storage memory failed") + val size = valuesBuilder.get.preciseSize + if (size > unrollMemoryUsedByThisBlock) { +val amountToRequest = size - unrollMemoryUsedByThisBlock +keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) +if (keepUnrolling) { + unrollMemoryUsedByThisBlock += amountToRequest } } - // Acquire storage memory if necessary to store this block in memory. - val enoughStorageMemory = { -if (unrollMemoryUsedByThisBlock <= size) { - val acquiredExtra = -memoryManager.acquireStorageMemory( - blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP) - if (acquiredExtra) { -transferUnrollToStorage(unrollMemoryUsedByThisBlock) - } - acquiredExtra -} else { // unrollMemoryUsedByThisBlock > size - // If this task attempt already owns more unroll memory than is necessary to store the - // block, then release the extra memory that will not be used. - val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) - transferUnrollToStorage(size) - true -} +} + +if (keepUnrolling) { --- End diff -- updated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163743072 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -346,85 +350,24 @@ private[spark] class MemoryStore( } else { initialMemoryThreshold.toInt } -val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator) -redirectableStream.setOutputStream(bbos) -val serializationStream: SerializationStream = { - val autoPick = !blockId.isInstanceOf[StreamBlockId] - val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() - ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) -} - -// Request enough memory to begin unrolling -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) - -if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + -s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") -} else { - unrollMemoryUsedByThisBlock += initialMemoryThreshold -} - -def reserveAdditionalMemoryIfNecessary(): Unit = { - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} - -// Unroll this block safely, checking whether we have exceeded our threshold -while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) - elementsUnrolled += 1 - if (elementsUnrolled % memoryCheckPeriod == 0) { -reserveAdditionalMemoryIfNecessary() - } -} -// Make sure that we have enough memory to store the block. By this point, it is possible that -// the block's actual memory usage has exceeded the unroll memory by a small amount, so we -// perform one final call to attempt to allocate additional memory if necessary. -if (keepUnrolling) { - serializationStream.close() - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} +val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag, + memoryMode, serializerManager) -if (keepUnrolling) { --- End diff -- Thanks for the detailed explanation. I have been updated, the code looks more clearly now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163551992 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -346,85 +350,24 @@ private[spark] class MemoryStore( } else { initialMemoryThreshold.toInt } -val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator) -redirectableStream.setOutputStream(bbos) -val serializationStream: SerializationStream = { - val autoPick = !blockId.isInstanceOf[StreamBlockId] - val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() - ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) -} - -// Request enough memory to begin unrolling -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) - -if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + -s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") -} else { - unrollMemoryUsedByThisBlock += initialMemoryThreshold -} - -def reserveAdditionalMemoryIfNecessary(): Unit = { - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} - -// Unroll this block safely, checking whether we have exceeded our threshold -while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) - elementsUnrolled += 1 - if (elementsUnrolled % memoryCheckPeriod == 0) { -reserveAdditionalMemoryIfNecessary() - } -} -// Make sure that we have enough memory to store the block. By this point, it is possible that -// the block's actual memory usage has exceeded the unroll memory by a small amount, so we -// perform one final call to attempt to allocate additional memory if necessary. -if (keepUnrolling) { - serializationStream.close() - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} +val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag, + memoryMode, serializerManager) -if (keepUnrolling) { --- End diff -- I do not understand what you mean, could you explain it more? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163551817 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -702,6 +645,83 @@ private[spark] class MemoryStore( } } +private trait ValuesHolder[T] { + def storeValue(value: T): Unit + def estimatedSize(roughly: Boolean): Long --- End diff -- Very thanks, I'll update it tomorrow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163462053 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -233,17 +235,13 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size + // We need more precise value + val size = valuesHolder.esitimatedSize(false) --- End diff -- I change the code back to originally. For `DeserializedValuesHolder`, we could `buildEntry` and get the `size` from `MemorySize`. But for `SerializedValuesHolder`, this way not work correctly. Because we need call the `bbos.toChunkedByteBuffer` to get the `MemoryEntry` object, and if the reserved memory is not enough for transfer the unroll memory to storage memory. Then we unroll failed and need call `bbos.toChunkedByteBuffer` ([L802](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala#L802), this should be intentional which related to #15043). So the problem is that we call `bbos.toChunkedByteBuffer` twice, but it can't be. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19285 Thanks for your valuable suggestion, the code has been updated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163131741 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -162,26 +162,29 @@ private[spark] class MemoryStore( } /** - * Attempt to put the given block in memory store as values. + * Attempt to put the given block in memory store as values or bytes. * * It's possible that the iterator is too large to materialize and store in memory. To avoid * OOM exceptions, this method will gradually unroll the iterator while periodically checking * whether there is enough free memory. If the block is successfully materialized, then the * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. + * @param blockId The block id. + * @param values The values which need be stored. + * @param classTag the [[ClassTag]] for the block. + * @param memoryMode The values saved mode. --- End diff -- Here the `values` is the `* @param values The values which need be stored.`, it isn't the type of storage, I'll re-describe later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163131519 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -162,26 +162,29 @@ private[spark] class MemoryStore( } /** - * Attempt to put the given block in memory store as values. + * Attempt to put the given block in memory store as values or bytes. * * It's possible that the iterator is too large to materialize and store in memory. To avoid * OOM exceptions, this method will gradually unroll the iterator while periodically checking * whether there is enough free memory. If the block is successfully materialized, then the * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. + * @param blockId The block id. + * @param values The values which need be stored. + * @param classTag the [[ClassTag]] for the block. + * @param memoryMode The values saved mode. + * @param valuesHolder A holder that supports storing record of values into memory store as + *values or bytes. + * @return if the block is stored successfully, return the stored data size. Else return the + * memory has used for unroll the block. --- End diff -- I' ll update it more clearly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163131383 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -233,17 +235,13 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size + // We need more precise value + val size = valuesHolder.esitimatedSize(false) --- End diff -- > It seems we can just build the entry and call entry.size. It 's more reasonable, I'll update it later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162840896 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -162,26 +162,29 @@ private[spark] class MemoryStore( } /** - * Attempt to put the given block in memory store as values. + * Attempt to put the given block in memory store as values or bytes. * * It's possible that the iterator is too large to materialize and store in memory. To avoid * OOM exceptions, this method will gradually unroll the iterator while periodically checking * whether there is enough free memory. If the block is successfully materialized, then the * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. + * @param blockId The block id. + * @param values The values which need be stored. + * @param classTag the [[ClassTag]] for the block. + * @param memoryMode The values saved mode. + * @param valuesHolder A holder that supports storing record of values into memory store as + *values of bytes. + * @return if the block is stored successfully, return the stored data size. Else return the + * memory has used for unroll the block. --- End diff -- The block can be unrolled fully, but the used memory exceeded the request and can't request the extra memory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162840776 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -162,26 +162,29 @@ private[spark] class MemoryStore( } /** - * Attempt to put the given block in memory store as values. + * Attempt to put the given block in memory store as values or bytes. * * It's possible that the iterator is too large to materialize and store in memory. To avoid * OOM exceptions, this method will gradually unroll the iterator while periodically checking * whether there is enough free memory. If the block is successfully materialized, then the * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. + * @param blockId The block id. + * @param values The values which need be stored. + * @param classTag the [[ClassTag]] for the block. + * @param memoryMode The values saved mode. --- End diff -- `MemoryMode` only has ON_HEAP and OFF_HEAP two modes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...
Github user ConeyLiu closed the pull request at: https://github.com/apache/spark/pull/20026 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20026: [SPARK-22838][Core] Avoid unnecessary copying of data
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/20026 close it, thanks for everyone. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/20026#discussion_r162810684 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -152,7 +153,7 @@ private class DiskBlockData( file: File, blockSize: Long) extends BlockData { - override def toInputStream(): InputStream = new FileInputStream(file) + override def toInputStream(): InputStream = new NioBufferedFileInputStream(file) --- End diff -- >IIUC for network (netty) transmission, it uses zero copy sendFile, which is another path (toNetty). Thanks for explaining, I did not notice this before. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/20026#discussion_r162803175 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -152,7 +153,7 @@ private class DiskBlockData( file: File, blockSize: Long) extends BlockData { - override def toInputStream(): InputStream = new FileInputStream(file) + override def toInputStream(): InputStream = new NioBufferedFileInputStream(file) --- End diff -- Hi @jerryshao, thanks for reviewing. This is inspired by #15408. > the returned `InputStream` will be deserialized in `BlockManger` This is not entirely correct. Sometimes we don't need deserialized, such as network transmission. And also, this does not add extra work to deserialization, but reduces the effort of network-like delivery. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19285 Thanks for reviewing. The code has updated, pls help to review. Thanks again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r162802949 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -261,37 +263,93 @@ private[spark] class MemoryStore( // If this task attempt already owns more unroll memory than is necessary to store the // block, then release the extra memory that will not be used. val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) + releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory) transferUnrollToStorage(size) true } } + if (enoughStorageMemory) { entries.synchronized { - entries.put(blockId, entry) + entries.put(blockId, createMemoryEntry()) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) Right(size) } else { assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock, "released too much unroll memory") +Left(unrollMemoryUsedByThisBlock) + } +} else { + Left(unrollMemoryUsedByThisBlock) +} + } + + /** + * Attempt to put the given block in memory store as values. + * + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. + * + * @return in case of success, the estimated size of the stored data. In case of failure, return + * an iterator containing the values of the block. The returned iterator will be backed + * by the combination of the partially-unrolled block and the remaining elements of the + * original input iterator. The caller must either fully consume this iterator or call + * `close()` on it in order to free the storage memory consumed by the partially-unrolled + * block. + */ + private[storage] def putIteratorAsValues[T]( + blockId: BlockId, + values: Iterator[T], + classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { + +// Underlying vector for unrolling the block +var vector = new SizeTrackingVector[T]()(classTag) +var arrayValues: Array[T] = null +var preciseSize: Long = -1 + +def storeValue(value: T): Unit = { + vector += value +} + +def estimateSize(precise: Boolean): Long = { + if (precise) { +// We only call need the precise size after all values unrolled. +arrayValues = vector.toArray +preciseSize = SizeEstimator.estimate(arrayValues) +preciseSize + } else { +vector.estimateSize() + } +} + +def createMemoryEntry(): MemoryEntry[T] = { + // We successfully unrolled the entirety of this block + DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) +} + +putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, storeValue, + estimateSize, createMemoryEntry) match { + case Right(storedSize) => Right(storedSize) + case Left(unrollMemoryUsedByThisBlock) => +// We ran out of space while unrolling the values for this block +val (unrolledIterator, size) = if (vector != null) { --- End diff -- updated, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20026: [SPARK-22838][Core] Avoid unnecessary copying of data
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/20026 cc @jiangxb1987 any comments on this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20026: [SPARK-22838][Core] Avoid unnecessary copying of data
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/20026 I'll update it tomorrow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/20026#discussion_r158279099 --- Diff: core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java --- @@ -61,6 +61,7 @@ private boolean refill() throws IOException { nRead = fileChannel.read(byteBuffer); } if (nRead < 0) { +byteBuffer.flip(); --- End diff -- The problem is when we reach the end of file, we call `NioBufferedFileInputStream.call()` twice can't get the `-1` both. you can see here: org.apache.commons.crypto.stream.input.StreamInput.java ```java public int read(ByteBuffer dst) throws IOException { int remaining = dst.remaining(); int read = 0; while(remaining > 0) { int n = this.in.read(this.buf, 0, Math.min(remaining, this.bufferSize)); if (n == -1) { if (read == 0) { read = -1; } break; } if (n > 0) { dst.put(this.buf, 0, n); read += n; remaining -= n; } } return read; } ``` 1. reading the data from the underlying `NioBufferedFileInputStream`, read some remaining bytes. 2. reaching the end, so this time we call the `byteBuffer.flip()`. After `flip()`, the `byteBuffer.hasRemaining()` changed to true, because the `flip()` changed the limit and position. 3. But the return of `StreamInput.read()` is larger than 0 which read from step 1. 4. So we will call the `StreamInput.read()` . But the `byteBuffer.hasRemaining()` of `NioBufferedFileInputStream` changed, so we will read the dirty data from the `byteBuffer`. I'm not very sure this explanation is correct enough. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20026: [SPARK-22838][Core] Avoid unnecessary copying of data
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/20026 It seems the error not related. And can you add me to the whitelist? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/20026#discussion_r158243377 --- Diff: core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java --- @@ -61,6 +61,7 @@ private boolean refill() throws IOException { nRead = fileChannel.read(byteBuffer); } if (nRead < 0) { +byteBuffer.flip(); --- End diff -- This related to this error: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85228/testReport/org.apache.spark.storage/BlockManagerSuite/LRU_with_mixed_storage_levels_and_streams__encryption___on_/ I'm not very sure the reason, but I guess this happens such as follow: ```scala var i = 0 while (i < inputStream.avaiable()) { //do something } ``` After we arrived at the end of the file which `i == (inputStream.avaiable() - 1)`, then we get `-1` from `inputStream.read()`. And this time we need to call the `refill()` too. Even if we can't get the data from the underlying `fileChannel`, but the `byteBuffer` flipped. So the `inputStream.avaiable` changed, and we still can read the dirty data remained in the `byteBuffer`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/20026#discussion_r158220107 --- Diff: core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java --- @@ -91,7 +92,12 @@ public synchronized int read(byte[] b, int offset, int len) throws IOException { @Override public synchronized int available() throws IOException { -return byteBuffer.remaining(); +int n = byteBuffer.remaining(); +long avail = fileChannel.size() - fileChannel.position(); +long total = avail + n; +return avail > (Long.MAX_VALUE - n) + ? Integer.MAX_VALUE : total > Integer.MAX_VALUE + ? Integer.MAX_VALUE : (int)total; --- End diff -- This also shoule be fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/20026#discussion_r158220052 --- Diff: core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java --- @@ -61,6 +61,7 @@ private boolean refill() throws IOException { nRead = fileChannel.read(byteBuffer); } if (nRead < 0) { +byteBuffer.flip(); --- End diff -- This should be a bug. The byteBuffer is fiped after `fileChannel.read(byteBuffer)`, so if then we can read the remaining dirty data. The test case will be add latter. I'm not sure this is should be another patch. @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/20026#discussion_r158181658 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -208,7 +209,7 @@ private class EncryptedBlockData( conf: SparkConf, key: Array[Byte]) extends BlockData { - override def toInputStream(): InputStream = Channels.newInputStream(open()) + override def toInputStream(): InputStream = new NioBufferedFileInputStream(file) --- End diff -- Sorry for the mistake, updated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/20026#discussion_r157938250 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -208,7 +209,7 @@ private class EncryptedBlockData( conf: SparkConf, key: Array[Byte]) extends BlockData { - override def toInputStream(): InputStream = Channels.newInputStream(open()) + override def toInputStream(): InputStream = new NioBufferedFileInputStream(file) --- End diff -- You meaning the memory buffer? The `NioBufferedFileInputStream` has `close` method, you can see follow: org.apache.spark.io.NioBufferedFileInputStream.java ```java @Override public synchronized void close() throws IOException { fileChannel.close(); StorageUtils.dispose(byteBuffer); } @Override protected void finalize() throws IOException { close(); } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20026: [SPARK-22838][Core] Avoid unnecessary copying of data
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/20026 @cloud-fan Please take a look, thanks a lot. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20026: [SPARK-22838][Core] Avoid unnecessary copying of ...
GitHub user ConeyLiu opened a pull request: https://github.com/apache/spark/pull/20026 [SPARK-22838][Core] Avoid unnecessary copying of data ## What changes were proposed in this pull request? If we read data from FileChannel to HeapByteBuffer, there is a need to copy the data from the off-heap to the on-heap, you can see the follow code: ```java static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException { if(var1.isReadOnly()) { throw new IllegalArgumentException("Read-only buffer"); } else if(var1 instanceof DirectBuffer) { return readIntoNativeBuffer(var0, var1, var2, var4); } else { ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining()); int var7; try { int var6 = readIntoNativeBuffer(var0, var5, var2, var4); var5.flip(); if(var6 > 0) { var1.put(var5); } var7 = var6; } finally { Util.offerFirstTemporaryDirectBuffer(var5); } return var7; } } ``` ## How was this patch tested? Existing UT. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ConeyLiu/spark datacopy Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20026.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20026 commit 27d2b1b3991602a7126d85cfc69c3da1ff84d599 Author: Xianyang Liu <xianyang.liu@...> Date: 2017-12-20T02:32:44Z small fix commit 51c32c8c6cc89a3249b3ef856c41f3c238b59f4a Author: Xianyang Liu <xianyang.liu@...> Date: 2017-12-20T03:44:00Z fix code style --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19735: [MINOR][CORE] Using bufferedInputStream for dataDeserial...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19735 thanks @jerryshao @srowen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19735: [MINOR][CORE] Using bufferedInputStream for dataDeserial...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19735 @srowen Could you take a look? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19735: [MINOR][CORE] Using bufferedInputStream for dataD...
GitHub user ConeyLiu opened a pull request: https://github.com/apache/spark/pull/19735 [MINOR][CORE] Using bufferedInputStream for dataDeserializeStream ## What changes were proposed in this pull request? Small fix. Using bufferedInputStream for dataDeserializeStream. ## How was this patch tested? Existing UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ConeyLiu/spark smallfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19735.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19735 commit 9fe6426c61391f1e913be4abecba8b4f87785c5c Author: Xianyang Liu <xianyang@intel.com> Date: 2017-11-13T05:15:04Z using bufferdinputstream --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19661: [SPARK-22450][Core][Mllib]safely register class for mlli...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19661 thanks everyone. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19661: [SPARK-22450][Core][Mllib]safely register class f...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19661#discussion_r150173067 --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala --- @@ -108,6 +108,27 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { check(Array(Array("1", "2"), Array("1", "2", "3", "4"))) } + test("safely register class for mllib/ml") { +val conf = new SparkConf(false) +val ser = new KryoSerializer(conf) + +Seq("org.apache.spark.mllib.linalg.Vector", + "org.apache.spark.mllib.linalg.DenseVector", + "org.apache.spark.mllib.linalg.SparseVector", + "org.apache.spark.mllib.linalg.Matrix", + "org.apache.spark.mllib.linalg.DenseMatrix", + "org.apache.spark.mllib.linalg.SparseMatrix", + "org.apache.spark.ml.linalg.Vector", + "org.apache.spark.ml.linalg.DenseVector", + "org.apache.spark.ml.linalg.SparseVector", + "org.apache.spark.ml.linalg.Matrix", + "org.apache.spark.ml.linalg.DenseMatrix", + "org.apache.spark.ml.linalg.SparseMatrix", + "org.apache.spark.ml.feature.Instance", + "org.apache.spark.ml.feature.OffsetInstance" +).foreach(!Utils.classIsLoadable(_)) --- End diff -- Ok, remove it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19661: [SPARK-22450][Core][Mllib]safely register class f...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19661#discussion_r150172493 --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala --- @@ -108,6 +108,27 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { check(Array(Array("1", "2"), Array("1", "2", "3", "4"))) } + test("safely register class for mllib/ml") { +val conf = new SparkConf(false) +val ser = new KryoSerializer(conf) + +Seq("org.apache.spark.mllib.linalg.Vector", + "org.apache.spark.mllib.linalg.DenseVector", + "org.apache.spark.mllib.linalg.SparseVector", + "org.apache.spark.mllib.linalg.Matrix", + "org.apache.spark.mllib.linalg.DenseMatrix", + "org.apache.spark.mllib.linalg.SparseMatrix", + "org.apache.spark.ml.linalg.Vector", + "org.apache.spark.ml.linalg.DenseVector", + "org.apache.spark.ml.linalg.SparseVector", + "org.apache.spark.ml.linalg.Matrix", + "org.apache.spark.ml.linalg.DenseMatrix", + "org.apache.spark.ml.linalg.SparseMatrix", + "org.apache.spark.ml.feature.Instance", + "org.apache.spark.ml.feature.OffsetInstance" +).foreach(!Utils.classIsLoadable(_)) --- End diff -- This just want to indicate we didn't introduce extra jar dependency. I can delete it if it's unnecessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19661: [SPARK-22450][Core][Mllib]safely register class f...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19661#discussion_r150157116 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -178,6 +179,28 @@ class KryoSerializer(conf: SparkConf) kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$")) kryo.register(classOf[ArrayBuffer[Any]]) +// We can't load those class directly in order to avoid unnecessary jar dependencies. +// We load them safely, ignore it if the class not found. +Seq("org.apache.spark.mllib.linalg.Vector", + "org.apache.spark.mllib.linalg.DenseVector", + "org.apache.spark.mllib.linalg.SparseVector", + "org.apache.spark.mllib.linalg.Matrix", + "org.apache.spark.mllib.linalg.DenseMatrix", + "org.apache.spark.mllib.linalg.SparseMatrix", + "org.apache.spark.ml.linalg.Vector", + "org.apache.spark.ml.linalg.DenseVector", + "org.apache.spark.ml.linalg.SparseVector", + "org.apache.spark.ml.linalg.Matrix", + "org.apache.spark.ml.linalg.DenseMatrix", + "org.apache.spark.ml.linalg.SparseMatrix", + "org.apache.spark.ml.feature.Instance", + "org.apache.spark.ml.feature.OffsetInstance" +).map(name => Try(Utils.classForName(name))).foreach { t => --- End diff -- updated. thanks for the advice. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19661: [SPARK-22450][Core][Mllib]safely register class f...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19661#discussion_r149846210 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -178,10 +178,40 @@ class KryoSerializer(conf: SparkConf) kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$")) kryo.register(classOf[ArrayBuffer[Any]]) +// We can't load those class directly in order to avoid unnecessary jar dependencies. +// We load them safely, ignore it if the class not found. +Seq("org.apache.spark.mllib.linalg.Vector", + "org.apache.spark.mllib.linalg.DenseVector", + "org.apache.spark.mllib.linalg.SparseVector", + "org.apache.spark.mllib.linalg.Matrix", + "org.apache.spark.mllib.linalg.DenseMatrix", + "org.apache.spark.mllib.linalg.SparseMatrix", + "org.apache.spark.ml.linalg.Vector", + "org.apache.spark.ml.linalg.DenseVector", + "org.apache.spark.ml.linalg.SparseVector", + "org.apache.spark.ml.linalg.Matrix", + "org.apache.spark.ml.linalg.DenseMatrix", + "org.apache.spark.ml.linalg.SparseMatrix", + "org.apache.spark.ml.feature.Instance", + "org.apache.spark.ml.feature.OffsetInstance" +).flatMap(safeClassLoader(_)).foreach(kryo.register(_)) --- End diff -- thanks a lot, I updated the code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19661: [SPARK-22450][Core][Mllib]safely register class f...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19661#discussion_r149553694 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -178,10 +178,40 @@ class KryoSerializer(conf: SparkConf) kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$")) kryo.register(classOf[ArrayBuffer[Any]]) +// We can't load those class directly in order to avoid unnecessary jar dependencies. +// We load them safely, ignore it if the class not found. +Seq("org.apache.spark.mllib.linalg.Vector", + "org.apache.spark.mllib.linalg.DenseVector", + "org.apache.spark.mllib.linalg.SparseVector", + "org.apache.spark.mllib.linalg.Matrix", + "org.apache.spark.mllib.linalg.DenseMatrix", + "org.apache.spark.mllib.linalg.SparseMatrix", + "org.apache.spark.ml.linalg.Vector", + "org.apache.spark.ml.linalg.DenseVector", + "org.apache.spark.ml.linalg.SparseVector", + "org.apache.spark.ml.linalg.Matrix", + "org.apache.spark.ml.linalg.DenseMatrix", + "org.apache.spark.ml.linalg.SparseMatrix", + "org.apache.spark.ml.feature.Instance", + "org.apache.spark.ml.feature.OffsetInstance" +).flatMap(safeClassLoader(_)).foreach(kryo.register(_)) --- End diff -- Hi @cloud-fan , I tried the following code: ```scala flatMap(cn => Try{Utils.classForName(cn)}.toOption).foreach(kryo.register(_)) ``` and ```scala flatMap{ cn => try { val clazz = Utils.classForName(cn) Some(clazz) } catch { case _: ClassNotFoundException => None } }.foreach(kryo.register(_)) ``` Both reported the same errors: ``` Error:(198, 18) type mismatch; found : String => Iterable[Class[_$2]]( forSome { type _$2 }) required: String => scala.collection.GenTraversableOnce[B] ).flatMap{cn => Option(Utils.classForName(cn))}.foreach(kryo.register(_)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19285 It's updated. Thanks a lot. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/17936 OK, thanks a lot. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19661: [SPARK-22450][Core][Mllib]safely register class for mlli...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19661 Thanks for reviewing. The code is updated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19661: [SPARK-22450][Core][Mllib]safely register class for mlli...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19661 >So why don't you include some classes such as org.apache.spark.ml.feature.Instance ? I'm not family with those algorithm, I can add them such as `org.apache.spark.ml.feature.Instance` . If this method is not reasonable, we maybe just reminder user to register them in the doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19586: [SPARK-22367][WIP][CORE] Separate the serialization of c...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19586 Thanks for the suggestion, I re-raised a pr to solve this problem. Close it now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19586: [SPARK-22367][WIP][CORE] Separate the serializati...
Github user ConeyLiu closed the pull request at: https://github.com/apache/spark/pull/19586 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19661: [SPARK-22450][Core][Mllib]safely register class for mlli...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19661 #19586 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19661: [SPARK-22450][Core][Mllib]safely register class f...
GitHub user ConeyLiu opened a pull request: https://github.com/apache/spark/pull/19661 [SPARK-22450][Core][Mllib]safely register class for mllib ## What changes were proposed in this pull request? There are still some algorithms based on mllib, such as KMeans. For now, many mllib common class (such as: Vector, DenseVector, SparseVector, Matrix, DenseMatrix, SparseMatrix) are not registered in Kryo. So there are some performance issues for those object serialization or deserialization. Previously dicussed: https://github.com/apache/spark/pull/19586 ## How was this patch tested? New test case. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ConeyLiu/spark register_vector Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19661.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19661 commit 317229fabd7b470c8c349a4f12604ea22af0d27f Author: Xianyang Liu <xianyang@intel.com> Date: 2017-11-06T02:40:55Z safely register class for mllib --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19586: [SPARK-22367][WIP][CORE] Separate the serialization of c...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19586 Hi @cloud-fan, @jerryshao. The problem of `writeClass` and `readClass` can be solved by register the class: Vector, DenseVector, SparseVector. The follow is the test results: ```scala val conf = new SparkConf().setAppName("Vector Register Test") conf.registerKryoClasses(Array(classOf[Vector], classOf[DenseVector], classOf[SparseVector])) val sc = new SparkContext(conf) val sourceData = sc.sequenceFile[LongWritable, VectorWritable](args(0)) .map { case (k, v) => val vector = v.get() val tmpVector = new Array[Double](v.get().size()) for (i <- 0 until vector.size()) { tmpVector(i) = vector.get(i) } Vectors.dense(tmpVector) } sourceData.persist(StorageLevel.OFF_HEAP) var start = System.currentTimeMillis() sourceData.count() println("First: " + (System.currentTimeMillis() - start)) start = System.currentTimeMillis() sourceData.count() println("Second: " + (System.currentTimeMillis() - start)) sc.stop() ``` Results: serialized size: before 38.4GB after: 30.5GB First time: before 93318msafter: 80708ms Second time: before: 5870msafter: 3382ms Those classes are very common for ML, and also `Matrix`, `DenseMatrix` and `SparseMatrix` too. I'm not sure whether we should register those classes in core directly, because this could introduce extra jar dependency. So could you give some advice? Or else we just remind in the ml doc? The reason shoule be the problem of kryo, it will write the full class name instead of the classID if the class is not registered. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19586: [SPARK-22367][WIP][CORE] Separate the serialization of c...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19586 OK, I can understand your concern. There is huge gc problem for K-means workload, it occupied about 10-20% percent. The source data is cached in memory, there is even worse performance when the source data can't be cached in memory. So we try the source data to off-heap. However, the training time even worse after using the off-heap memory. Because the gc only occupied about 10-20% with on-heap memory, while deserialization occupied about 30-40% with off-heap memory even if the gc problem solved. https://user-images.githubusercontent.com/12733256/32313752-5dbec220-bfdf-11e7-8b49-d5daa47cd50f.PNG;> https://user-images.githubusercontent.com/12733256/32313788-824b8470-bfdf-11e7-9b59-aea26e9c6c0a.PNG;> You can see the pic, the `readClass` occupied about 13% . So I opened this pr. With this path test result, the total time (loading data + training kmeans model) saved about 10% time. The above picture is only about training phase, not include the loading source data phase, so the improvement should be larger as we expected. And I plan to optimize the `readObjectOrNull` after this. Also, I found the `Vector` is not registered, so I will test the performance with the registered vector. This maybe can reduce the cpu occupied, but can't save the serialized memory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19586: [SPARK-22367][WIP][CORE] Separate the serialization of c...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19586 Hi @cloud-fan, for most case the data type should be same. So I think this optimization is valuable, because it can save the space and cpu resource considerable. What about setting a flag for the RDD, which indicates whether the RDD only has the same types. If it'st not valid, could we putting it to the ml package for special serializer, then user could configure it. But for this case, there must be provided the exactly classtag of the RDD for serialization due to the relocation of unsafeshufflewrite. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19586: [SPARK-22367][WIP][CORE] Separate the serialization of c...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19586 Currently, I use it directly. Maybe this is suitable for some special case which has same type data, such as ml or else. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19586: [SPARK-22367][WIP][CORE] Separate the serialization of c...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19586 Hi @jerryshao, Thanks for the reminder, it doesn't support it. I'm sorry I did not take that into account. How about using configuration to determine whether we should use `SerializerInstance#serializeStreamForClass[T]`. For most case the data type should be same. Can you give some advice? Also cc @cloud-fan @srowen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19586: [SPARK-22367][WIP][CORE] Separate the serialization of c...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19586 Hi @cloud-fan, thanks for reviewing. There are some errors about `UnsafeShuffleWrite` need further fixed. I am not familiar with this code, so I need some time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19586: [SPARK-22367][WIP][CORE] Separate the serializati...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19586#discussion_r147709649 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -205,11 +205,45 @@ class KryoSerializationStream( private[this] var kryo: Kryo = serInstance.borrowKryo() + // This is only used when we write object and class separately. + var classWrote = false + override def writeObject[T: ClassTag](t: T): SerializationStream = { kryo.writeClassAndObject(output, t) --- End diff -- From the code, it just write a `varInt` if the class have been registered. And also there need some calculation for getting the `varInt`. But from the test, the overhead looks more serious than I expected. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19586: [SPARK-22367][CORE] Separate the serialization of...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19586#discussion_r147371400 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -376,7 +382,17 @@ private[spark] class MemoryStore( // Unroll this block safely, checking whether we have exceeded our threshold while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) + val value = values.next() + if (kryoSerializationStream != null) { +if (!kryoSerializationStream.classWrote) { + kryoSerializationStream.writeClass(value.getClass) --- End diff -- @srowen you can see here. Here we don't use the writeAll, because we need acquire memory according to the written size. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19586: [SPARK-22367][CORE] Separate the serialization of class ...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19586 @srowen Thanks for the reviewing. What do you meaning here? > I'm trying to think if there's any case where we intend to support kryo/java serialized objects from 2.x in 2.y. After you registered. It still writes the class (not class full name but just a class ID) if you call `writeObjectAndClass`. In order to get the class id, there is also need some calculation. And then write the class ID and object. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19586: [SPARK-22367][CORE] Separate the serialization of...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19586#discussion_r147368368 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -860,9 +876,26 @@ private[storage] class PartiallySerializedBlock[T]( ByteStreams.copy(unrolledBuffer.toInputStream(dispose = true), os) memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory) redirectableOutputStream.setOutputStream(os) + +// Whether we use Kryo serialization +var kryoSerializationStream: KryoSerializationStream = null +if (serializationStream.isInstanceOf[KryoSerializationStream]) { + kryoSerializationStream = serializationStream.asInstanceOf[KryoSerializationStream] +} + while (rest.hasNext) { - serializationStream.writeObject(rest.next())(classTag) + val value = rest.next() + if (kryoSerializationStream != null) { +if (!kryoSerializationStream.classWrote) { + kryoSerializationStream.writeClass(value.getClass) --- End diff -- @srowen you can see here. Here we don't use the writeAll, because we need acquire memory according to the written size. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19586: [SPARK-22367][CORE] Separate the serialization of...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19586#discussion_r147368002 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -205,11 +205,45 @@ class KryoSerializationStream( private[this] var kryo: Kryo = serInstance.borrowKryo() + // This is only used when we write object and class separately. + var classWrote = false --- End diff -- Yeah, it used for `writeAll / asIterator`. But for `MemoryStorea.putIteratorAsBytes`, we don't use the writeAll, we use this state to indicate whether we have written the class first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19586: [SPARK-22367][CORE] Separate the serialization of...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19586#discussion_r147367241 --- Diff: pom.xml --- @@ -133,7 +133,7 @@ 1.6.0 9.3.20.v20170531 3.1.0 -0.8.4 +0.9.2 --- End diff -- Not necessary. Chill 0.9.2 uses kryo 4.0. I can change it back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19586: [SPARK-22367][CORE] Separate the serialization of class ...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19586 One executor, the configuration as follows: the script: ```shell ${SPARK_HOME}/bin/spark-submit \ --class com.intel.KryoTest \ --master yarn \ --deploy-mode cluster \ --conf spark.memory.offHeap.enabled=true \ --conf spark.memory.offHeap.size=50g \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --driver-memory 5G \ --driver-cores 10\ --executor-memory 40G \ --executor-cores 20\ --num-executors 1 \ ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19586: [SPARK-22367][CORE] Separate the serialization of...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19586#discussion_r147346131 --- Diff: pom.xml --- @@ -133,7 +133,7 @@ 1.6.0 9.3.20.v20170531 3.1.0 -0.8.4 +0.9.2 --- End diff -- I am not sure whether this is should be changed. If it is unreasonable, I can change it back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19586: [SPARK-22367][CORE] Separate the serialization of class ...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19586 Hi, @cloud-fan @jiangxb1987 @chenghao-intel. Would you mind take a look? Thanks a lot. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19586: [SPARK-22367][CORE] Separate the serialization of...
GitHub user ConeyLiu opened a pull request: https://github.com/apache/spark/pull/19586 [SPARK-22367][CORE] Separate the serialization of class and object for iteraor ## What changes were proposed in this pull request? Becuase they are all the same class for an iterator. So there is no need write class information for every record in the iterator. We only need write the class information once at the serialization beginning, also only need read the class information once for deserialization. In this patch, we separate the serialization of class and object for an iterator serialized by Kryo. This can improve the performance of the serialization and deserialization, and save the space. Test case: ```scala val conf = new SparkConf().setAppName("Test for serialization") val sc = new SparkContext(conf) val random = new Random(1) val data = sc.parallelize(1 to 10).map { i => Person("id-" + i, random.nextInt(Integer.MAX_VALUE)) }.persist(StorageLevel.OFF_HEAP) var start = System.currentTimeMillis() data.count() println("First time: " + (System.currentTimeMillis() - start)) start = System.currentTimeMillis() data.count() println("Second time: " + (System.currentTimeMillis() - start)) ``` Test result: The size of serialized: before: 34.3GB after: 17.5GB | before(cal+serialization)| before(deserialization)| after(cal+serialization)| after(deserialization) | | --| -- | -- | -- | | 63869| 21882| 45513| 15158| | 59368| 21507| 51683| 15524| | 66230| 21481| 62163| 14903| | 62399| 22529| 52400| 16255| | 137564.2 | 136990.8 | 1.004186 | ## How was this patch tested? Existing UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ConeyLiu/spark kryo Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19586.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19586 commit c681e81f9d49b3558c91a3b981504159bbeff910 Author: Xianyang Liu <xianyang@intel.com> Date: 2017-10-26T06:37:04Z serialize object and class seperately for iterator commit 640ad5e1d12d1137f4c979a1e75dbdbd713e14de Author: Xianyang Liu <xianyang@intel.com> Date: 2017-10-26T06:42:58Z Merge remote-tracking branch 'spark/master' into kryo --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19511: [SPARK-22293][SQL] Avoid unnecessary traversal in...
Github user ConeyLiu closed the pull request at: https://github.com/apache/spark/pull/19511 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19511: [SPARK-22293][SQL] Avoid unnecessary traversal in Resolv...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19511 OK, close it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19317#discussion_r145294297 --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala --- @@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * as in scala.TraversableOnce. The former operation is used for merging values within a * partition, and the latter is used for merging values between partitions. To avoid memory * allocation, both of these functions are allowed to modify and return their first argument + * instead of creating a new U. This method is different from the ordinary "aggregateByKey" + * method, it directly returns a map to the driver, rather than a rdd. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a --- End diff -- thansk for the advance, I'll close it and try `mapPartitions(...).collect` in `NaiveBayes`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...
Github user ConeyLiu closed the pull request at: https://github.com/apache/spark/pull/19317 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19511: [SPARK-22293][SQL] Avoid unnecessary traversal in Resolv...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19511 Hi @gatorsmile, if we can combine the two traverse, this should be simplify the code not complicate. However, this can't get big performance improvement. And I can close it if this change unnecessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19511: [SPARK-22293][SQL] Avoid unnecessary traversal in...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19511#discussion_r145041400 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -890,32 +890,39 @@ class Analyzer( /** * Returns true if `exprs` contains a [[Star]]. + * @param deepInto Whether to traverse all the subtrees, true by default. */ -def containsStar(exprs: Seq[Expression]): Boolean = - exprs.exists(_.collect { case _: Star => true }.nonEmpty) +def containsStar(exprs: Seq[Expression], deepInto: Boolean = true): Boolean = { + if (deepInto) { +exprs.exists(_.collect { case _: Star => true }.nonEmpty) + } else { +exprs.exists{ case _: Star => true} + } +} + /** * Expands the matching attribute.*'s in `child`'s output. */ def expandStarExpression(expr: Expression, child: LogicalPlan): Expression = { expr.transformUp { -case f1: UnresolvedFunction if containsStar(f1.children) => +case f1: UnresolvedFunction if containsStar(f1.children, false) => --- End diff -- And aslo I have question: whether we could combine the two traverse into one ? Currently, we first need travese the `children` to test whether there is a `Star`, and then we traverse another one to expand it. We have to go through at least once, and need twice for existed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19511: [SPARK-22293][SQL] Avoid unnecessary traversal in Resolv...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19511 Hi, @cloud-fan @gatorsmile. Would you mind take a look? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19511: [SPARK-22293][SQL] Avoid unnecessary traversal in...
GitHub user ConeyLiu opened a pull request: https://github.com/apache/spark/pull/19511 [SPARK-22293][SQL] Avoid unnecessary traversal in ResolveReferences ## What changes were proposed in this pull request? We don't need traverse the children expression to determine whether there is an `Star` when expand `Star` expression. ## How was this patch tested? Existing UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ConeyLiu/spark resolveReferences Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19511.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19511 commit c58b3c0da49b4660affd386bae37fa6b5c9b3567 Author: Xianyang Liu <xianyang@intel.com> Date: 2017-10-17T06:29:02Z Avoid unnecessary traversal --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19317#discussion_r144760426 --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala --- @@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * as in scala.TraversableOnce. The former operation is used for merging values within a * partition, and the latter is used for merging values between partitions. To avoid memory * allocation, both of these functions are allowed to modify and return their first argument + * instead of creating a new U. This method is different from the ordinary "aggregateByKey" + * method, it directly returns a map to the driver, rather than a rdd. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a --- End diff -- `aggregateByKey(...).toLocalIterator` need a shuffle for `aggregateByKey` and then collect the `RDD` to driver as a iterator. But `aggregateByKeyLocally` seems like the `aggregateByKey`, while there isn't a shuffle. It calculates the combines in each task and then collect all the `map` direcly to driver and do the finally combines on driver. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19317: [SPARK-22098][CORE] Add new method aggregateByKey...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19317#discussion_r144755666 --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala --- @@ -180,6 +180,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * as in scala.TraversableOnce. The former operation is used for merging values within a * partition, and the latter is used for merging values between partitions. To avoid memory * allocation, both of these functions are allowed to modify and return their first argument + * instead of creating a new U. This method is different from the ordinary "aggregateByKey" + * method, it directly returns a map to the driver, rather than a rdd. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a --- End diff -- Yeah, it will. Here the 'difference' means it directly returns a map to the driver rather than an RDD. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19317: [SPARK-22098][CORE] Add new method aggregateByKeyLocally...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19317 Hi @WeichenXu123, any comments on this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19316: [SPARK-22097][CORE]Request an accurate memory aft...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19316#discussion_r144169752 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -388,7 +388,13 @@ private[spark] class MemoryStore( // perform one final call to attempt to allocate additional memory if necessary. if (keepUnrolling) { serializationStream.close() - reserveAdditionalMemoryIfNecessary() + if (bbos.size > unrollMemoryUsedByThisBlock) { +val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock --- End diff -- Because we request `val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong` in `reserveAdditionalMemoryIfNecessary ` to avoid requesting for every records. But here we just need request the precise memory for the last request. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19316: [SPARK-22097][CORE]Request an accurate memory aft...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19316#discussion_r143901552 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -388,7 +388,13 @@ private[spark] class MemoryStore( // perform one final call to attempt to allocate additional memory if necessary. if (keepUnrolling) { --- End diff -- We maybe need request the extra-memory (`bbos.size - unrollMemoryUsedByThisBlock`). So the `keepUnrolling` maybe change in line 393. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19316: [SPARK-22097][CORE]Request an accurate memory after we u...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19316 Hi @cloud-fan @jiangxb1987 Do you have time to check this? Thanks a lot. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19317: [SPARK-22098][CORE] Add new method aggregateByKeyLocally...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19317 Test case: ```scala test("performance of aggregateByKeyLocally ") { val random = new Random(1) val pairs = sc.parallelize(0 until 1000, 20) .map(p => (random.nextInt(100), p)) .persist(StorageLevel.MEMORY_ONLY) pairs.count() val start = System.currentTimeMillis() //val jHashMap = pairs.aggregateByKeyLocallyWithJHashMap(new HashSet[Int]())(_ += _, _ ++= _).toArray val openHashMap = pairs.aggregateByKeyLocally(new HashSet[Int]())(_ += _, _ ++= _).toArray println(System.currentTimeMillis() - start) } ``` Test result: | map| 1| 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | avg | | --| -- | -- |--| -- | -- |--| -- | -- |--| -- | -- | | JHashMap | 2921 | 2920 | 2843 | 2950 | 2898 | 3316 | 2770 | 2994 | 3016 | 3005 | 2963.3 | | OpenHashMap | 3029 | 2884 | 3064 | 3023 | 3108 | 3194 | 3003 | 2961 | 3115 | 3023 | 3040.4 | Looks almost the same performance. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19317: [SPARK-22098][CORE] Add new method aggregateByKeyLocally...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19317 OK, just keep it. Does this need more test or more improvements ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org