Repository: spark
Updated Branches:
  refs/heads/master 7d16776d2 -> 574ef6c98


[SPARK-21527][CORE] Use buffer limit in order to use JAVA NIO Util's buffercache

## What changes were proposed in this pull request?

Right now, ChunkedByteBuffer#writeFully do not slice bytes first.We observe 
code in java nio Util#getTemporaryDirectBuffer below:

        BufferCache cache = bufferCache.get();
        ByteBuffer buf = cache.get(size);
        if (buf != null) {
            return buf;
        } else {
            // No suitable buffer in the cache so we need to allocate a new
            // one. To avoid the cache growing then we remove the first
            // buffer from the cache and free it.
            if (!cache.isEmpty()) {
                buf = cache.removeFirst();
                free(buf);
            }
            return ByteBuffer.allocateDirect(size);
        }

If we slice first with a fixed size, we can use buffer cache and only need to 
allocate at the first write call.
Since we allocate new buffer, we can not control the free time of this 
buffer.This once cause memory issue in our production cluster.
In this patch, i supply a new api which will slice with fixed size for buffer 
writing.

## How was this patch tested?

Unit test and test in production.

Author: zhoukang <zhoukang199...@gmail.com>
Author: zhoukang <zhouk...@xiaomi.com>

Closes #18730 from caneGuy/zhoukang/improve-chunkwrite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/574ef6c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/574ef6c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/574ef6c9

Branch: refs/heads/master
Commit: 574ef6c987c636210828e96d2f797d8f10aff05e
Parents: 7d16776
Author: zhoukang <zhoukang199...@gmail.com>
Authored: Fri Aug 25 22:59:31 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Aug 25 22:59:31 2017 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/internal/config/package.scala |  9 +++++++++
 .../org/apache/spark/util/io/ChunkedByteBuffer.scala     | 11 ++++++++++-
 2 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/574ef6c9/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 9495cd2..0457a66 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -293,6 +293,15 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val BUFFER_WRITE_CHUNK_SIZE =
+    ConfigBuilder("spark.buffer.write.chunkSize")
+      .internal()
+      .doc("The chunk size during writing out the bytes of ChunkedByteBuffer.")
+      .bytesConf(ByteUnit.BYTE)
+      .checkValue(_ <= Int.MaxValue, "The chunk size during writing out the 
bytes of" +
+        " ChunkedByteBuffer should not larger than Int.MaxValue.")
+      .createWithDefault(64 * 1024 * 1024)
+
   private[spark] val CHECKPOINT_COMPRESS =
     ConfigBuilder("spark.checkpoint.compress")
       .doc("Whether to compress RDD checkpoints. Generally a good idea. 
Compression will use " +

http://git-wip-us.apache.org/repos/asf/spark/blob/574ef6c9/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index f48bfd5..c28570f 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -24,6 +24,8 @@ import java.nio.channels.WritableByteChannel
 import com.google.common.primitives.UnsignedBytes
 import io.netty.buffer.{ByteBuf, Unpooled}
 
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.config
 import org.apache.spark.network.util.ByteArrayWritableChannel
 import org.apache.spark.storage.StorageUtils
 
@@ -40,6 +42,11 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
   require(chunks != null, "chunks must not be null")
   require(chunks.forall(_.position() == 0), "chunks' positions must be 0")
 
+  // Chunk size in bytes
+  private val bufferWriteChunkSize =
+    Option(SparkEnv.get).map(_.conf.get(config.BUFFER_WRITE_CHUNK_SIZE))
+      .getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get).toInt
+
   private[this] var disposed: Boolean = false
 
   /**
@@ -56,7 +63,9 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
    */
   def writeFully(channel: WritableByteChannel): Unit = {
     for (bytes <- getChunks()) {
-      while (bytes.remaining > 0) {
+      while (bytes.remaining() > 0) {
+        val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
+        bytes.limit(bytes.position + ioSize)
         channel.write(bytes)
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to