Repository: spark
Updated Branches:
  refs/heads/branch-1.2 df8242c9b -> e9d009dc3


[SPARK-4307] Initialize FileDescriptor lazily in FileRegion.

Netty's DefaultFileRegion requires a FileDescriptor in its constructor, which 
means we need to have a opened file handle. In super large workloads, this 
could lead to too many open files due to the way these file descriptors are 
cleaned. This pull request creates a new LazyFileRegion that initializes the 
FileDescriptor when we are sending data for the first time.

Author: Reynold Xin <r...@databricks.com>
Author: Reynold Xin <r...@apache.org>

Closes #3172 from rxin/lazyFD and squashes the following commits:

0bdcdc6 [Reynold Xin] Added reference to Netty's DefaultFileRegion
d4564ae [Reynold Xin] Added SparkConf to the ctor argument of 
IndexShuffleBlockManager.
6ed369e [Reynold Xin] Code review feedback.
04cddc8 [Reynold Xin] [SPARK-4307] Initialize FileDescriptor lazily in 
FileRegion.

(cherry picked from commit ef29a9a9aa85468869eb67ca67b66c65f508d0ee)
Signed-off-by: Aaron Davidson <aa...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9d009dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9d009dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9d009dc

Branch: refs/heads/branch-1.2
Commit: e9d009dc348bc06198ed2c9e03f1ba870401e6df
Parents: df8242c
Author: Reynold Xin <r...@databricks.com>
Authored: Tue Nov 11 00:25:31 2014 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Tue Nov 11 00:25:49 2014 -0800

----------------------------------------------------------------------
 .../worker/StandaloneWorkerShuffleService.scala |   2 +-
 .../spark/shuffle/FileShuffleBlockManager.scala |   8 +-
 .../shuffle/IndexShuffleBlockManager.scala      |   8 +-
 .../spark/shuffle/sort/SortShuffleManager.scala |   2 +-
 .../spark/ExternalShuffleServiceSuite.scala     |   2 +-
 .../buffer/FileSegmentManagedBuffer.java        |  23 ++--
 .../spark/network/buffer/LazyFileRegion.java    | 111 +++++++++++++++++++
 .../spark/network/util/TransportConf.java       |  17 +++
 .../network/ChunkFetchIntegrationSuite.java     |   9 +-
 .../shuffle/ExternalShuffleBlockHandler.java    |   5 +-
 .../shuffle/ExternalShuffleBlockManager.java    |  13 ++-
 .../ExternalShuffleBlockManagerSuite.java       |  10 +-
 .../shuffle/ExternalShuffleCleanupSuite.java    |  13 ++-
 .../ExternalShuffleIntegrationSuite.java        |   2 +-
 .../shuffle/ExternalShuffleSecuritySuite.java   |   2 +-
 .../spark/network/yarn/YarnShuffleService.java  |   4 +-
 16 files changed, 191 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
 
b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
index 88118e2..d044e1d 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
@@ -40,7 +40,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, 
securityManager: Secu
   private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
 
   private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
-  private val blockHandler = new ExternalShuffleBlockHandler()
+  private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
   private val transportContext: TransportContext = {
     val handler = if (useSasl) new SaslRpcHandler(blockHandler, 
securityManager) else blockHandler
     new TransportContext(transportConf, handler)

http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
index f03e8e4..7de2f9c 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
@@ -27,6 +27,7 @@ import scala.collection.JavaConversions._
 import org.apache.spark.{Logging, SparkConf, SparkEnv}
 import org.apache.spark.executor.ShuffleWriteMetrics
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
+import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
 import org.apache.spark.storage._
@@ -68,6 +69,8 @@ private[spark]
 class FileShuffleBlockManager(conf: SparkConf)
   extends ShuffleBlockManager with Logging {
 
+  private val transportConf = SparkTransportConf.fromSparkConf(conf)
+
   private lazy val blockManager = SparkEnv.get.blockManager
 
   // Turning off shuffle file consolidation causes all shuffle Blocks to get 
their own file.
@@ -182,13 +185,14 @@ class FileShuffleBlockManager(conf: SparkConf)
         val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId, 
blockId.reduceId)
         if (segmentOpt.isDefined) {
           val segment = segmentOpt.get
-          return new FileSegmentManagedBuffer(segment.file, segment.offset, 
segment.length)
+          return new FileSegmentManagedBuffer(
+            transportConf, segment.file, segment.offset, segment.length)
         }
       }
       throw new IllegalStateException("Failed to find shuffle block: " + 
blockId)
     } else {
       val file = blockManager.diskBlockManager.getFile(blockId)
-      new FileSegmentManagedBuffer(file, 0, file.length)
+      new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
index a48f0c9..b292587 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
@@ -22,8 +22,9 @@ import java.nio.ByteBuffer
 
 import com.google.common.io.ByteStreams
 
-import org.apache.spark.SparkEnv
+import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
+import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.storage._
 
 /**
@@ -38,10 +39,12 @@ import org.apache.spark.storage._
 // Note: Changes to the format in this file should be kept in sync with
 // 
org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
 private[spark]
-class IndexShuffleBlockManager extends ShuffleBlockManager {
+class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
 
   private lazy val blockManager = SparkEnv.get.blockManager
 
+  private val transportConf = SparkTransportConf.fromSparkConf(conf)
+
   /**
    * Mapping to a single shuffleBlockId with reduce ID 0.
    * */
@@ -109,6 +112,7 @@ class IndexShuffleBlockManager extends ShuffleBlockManager {
       val offset = in.readLong()
       val nextOffset = in.readLong()
       new FileSegmentManagedBuffer(
+        transportConf,
         getDataFile(blockId.shuffleId, blockId.mapId),
         offset,
         nextOffset - offset)

http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index b727438..bda30a5 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -25,7 +25,7 @@ import org.apache.spark.shuffle.hash.HashShuffleReader
 
 private[spark] class SortShuffleManager(conf: SparkConf) extends 
ShuffleManager {
 
-  private val indexShuffleBlockManager = new IndexShuffleBlockManager()
+  private val indexShuffleBlockManager = new IndexShuffleBlockManager(conf)
   private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala 
b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 6608ed1..9623d66 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -39,7 +39,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with 
BeforeAndAfterAll {
 
   override def beforeAll() {
     val transportConf = SparkTransportConf.fromSparkConf(conf)
-    rpcHandler = new ExternalShuffleBlockHandler()
+    rpcHandler = new ExternalShuffleBlockHandler(transportConf)
     val transportContext = new TransportContext(transportConf, rpcHandler)
     server = transportContext.createServer()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 
b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
index 5fa1527..844eff4 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
@@ -31,24 +31,19 @@ import io.netty.channel.DefaultFileRegion;
 
 import org.apache.spark.network.util.JavaUtils;
 import org.apache.spark.network.util.LimitedInputStream;
+import org.apache.spark.network.util.TransportConf;
 
 /**
  * A {@link ManagedBuffer} backed by a segment in a file.
  */
 public final class FileSegmentManagedBuffer extends ManagedBuffer {
-
-  /**
-   * Memory mapping is expensive and can destabilize the JVM (SPARK-1145, 
SPARK-3889).
-   * Avoid unless there's a good reason not to.
-   */
-  // TODO: Make this configurable
-  private static final long MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;
-
+  private final TransportConf conf;
   private final File file;
   private final long offset;
   private final long length;
 
-  public FileSegmentManagedBuffer(File file, long offset, long length) {
+  public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, 
long length) {
+    this.conf = conf;
     this.file = file;
     this.offset = offset;
     this.length = length;
@@ -65,7 +60,7 @@ public final class FileSegmentManagedBuffer extends 
ManagedBuffer {
     try {
       channel = new RandomAccessFile(file, "r").getChannel();
       // Just copy the buffer if it's sufficiently small, as memory mapping 
has a high overhead.
-      if (length < MIN_MEMORY_MAP_BYTES) {
+      if (length < conf.memoryMapBytes()) {
         ByteBuffer buf = ByteBuffer.allocate((int) length);
         channel.position(offset);
         while (buf.remaining() != 0) {
@@ -134,8 +129,12 @@ public final class FileSegmentManagedBuffer extends 
ManagedBuffer {
 
   @Override
   public Object convertToNetty() throws IOException {
-    FileChannel fileChannel = new FileInputStream(file).getChannel();
-    return new DefaultFileRegion(fileChannel, offset, length);
+    if (conf.lazyFileDescriptor()) {
+      return new LazyFileRegion(file, offset, length);
+    } else {
+      FileChannel fileChannel = new FileInputStream(file).getChannel();
+      return new DefaultFileRegion(fileChannel, offset, length);
+    }
   }
 
   public File getFile() { return file; }

http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
 
b/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
new file mode 100644
index 0000000..81bc8ec
--- /dev/null
+++ 
b/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.FileInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+import com.google.common.base.Objects;
+import io.netty.channel.FileRegion;
+import io.netty.util.AbstractReferenceCounted;
+
+import org.apache.spark.network.util.JavaUtils;
+
+/**
+ * A FileRegion implementation that only creates the file descriptor when the 
region is being
+ * transferred. This cannot be used with Epoll because there is no native 
support for it.
+ * 
+ * This is mostly copied from DefaultFileRegion implementation in Netty. In 
the future, we
+ * should push this into Netty so the native Epoll transport can support this 
feature.
+ */
+public final class LazyFileRegion extends AbstractReferenceCounted implements 
FileRegion {
+
+  private final File file;
+  private final long position;
+  private final long count;
+
+  private FileChannel channel;
+
+  private long numBytesTransferred = 0L;
+
+  /**
+   * @param file file to transfer.
+   * @param position start position for the transfer.
+   * @param count number of bytes to transfer starting from position.
+   */
+  public LazyFileRegion(File file, long position, long count) {
+    this.file = file;
+    this.position = position;
+    this.count = count;
+  }
+
+  @Override
+  protected void deallocate() {
+    JavaUtils.closeQuietly(channel);
+  }
+
+  @Override
+  public long position() {
+    return position;
+  }
+
+  @Override
+  public long transfered() {
+    return numBytesTransferred;
+  }
+
+  @Override
+  public long count() {
+    return count;
+  }
+
+  @Override
+  public long transferTo(WritableByteChannel target, long position) throws 
IOException {
+    if (channel == null) {
+      channel = new FileInputStream(file).getChannel();
+    }
+
+    long count = this.count - position;
+    if (count < 0 || position < 0) {
+      throw new IllegalArgumentException(
+          "position out of range: " + position + " (expected: 0 - " + (count - 
1) + ')');
+    }
+
+    if (count == 0) {
+      return 0L;
+    }
+
+    long written = channel.transferTo(this.position + position, count, target);
+    if (written > 0) {
+      numBytesTransferred += written;
+    }
+    return written;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+        .add("file", file)
+        .add("position", position)
+        .add("count", count)
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java 
b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 787a8f0..621427d 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -75,4 +75,21 @@ public class TransportConf {
    * Only relevant if maxIORetries > 0.
    */
   public int ioRetryWaitTime() { return 
conf.getInt("spark.shuffle.io.retryWaitMs", 5000); }
+
+  /**
+   * Minimum size of a block that we should start using memory map rather than 
reading in through
+   * normal IO operations. This prevents Spark from memory mapping very small 
blocks. In general,
+   * memory mapping has high overhead for blocks close to or below the page 
size of the OS.
+   */
+  public int memoryMapBytes() {
+    return conf.getInt("spark.storage.memoryMapThreshold", 2 * 1024 * 1024);
+  }
+
+  /**
+   * Whether to initialize shuffle FileDescriptor lazily or not. If true, file 
descriptors are
+   * created only when data is going to be transferred. This can reduce the 
number of open files.
+   */
+  public boolean lazyFileDescriptor() {
+    return conf.getBoolean("spark.shuffle.io.lazyFD", true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
 
b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
index c415883..dfb7740 100644
--- 
a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
+++ 
b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
@@ -63,6 +63,8 @@ public class ChunkFetchIntegrationSuite {
   static ManagedBuffer bufferChunk;
   static ManagedBuffer fileChunk;
 
+  private TransportConf transportConf;
+
   @BeforeClass
   public static void setUp() throws Exception {
     int bufSize = 100000;
@@ -80,9 +82,10 @@ public class ChunkFetchIntegrationSuite {
     new Random().nextBytes(fileContent);
     fp.write(fileContent);
     fp.close();
-    fileChunk = new FileSegmentManagedBuffer(testFile, 10, testFile.length() - 
25);
 
-    TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+    final TransportConf conf = new TransportConf(new 
SystemPropertyConfigProvider());
+    fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, 
testFile.length() - 25);
+
     streamManager = new StreamManager() {
       @Override
       public ManagedBuffer getChunk(long streamId, int chunkIndex) {
@@ -90,7 +93,7 @@ public class ChunkFetchIntegrationSuite {
         if (chunkIndex == BUFFER_CHUNK_INDEX) {
           return new NioManagedBuffer(buf);
         } else if (chunkIndex == FILE_CHUNK_INDEX) {
-          return new FileSegmentManagedBuffer(testFile, 10, testFile.length() 
- 25);
+          return new FileSegmentManagedBuffer(conf, testFile, 10, 
testFile.length() - 25);
         } else {
           throw new IllegalArgumentException("Invalid chunk index: " + 
chunkIndex);
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index a6db4b2..46ca970 100644
--- 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import org.apache.spark.network.util.TransportConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,8 +49,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
   private final ExternalShuffleBlockManager blockManager;
   private final OneForOneStreamManager streamManager;
 
-  public ExternalShuffleBlockHandler() {
-    this(new OneForOneStreamManager(), new ExternalShuffleBlockManager());
+  public ExternalShuffleBlockHandler(TransportConf conf) {
+    this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf));
   }
 
   /** Enables mocking out the StreamManager and BlockManager. */

http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
index ffb7faa..dfe0ba0 100644
--- 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
+++ 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
@@ -37,6 +37,7 @@ import 
org.apache.spark.network.buffer.FileSegmentManagedBuffer;
 import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
 import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.TransportConf;
 
 /**
  * Manages converting shuffle BlockIds into physical segments of local files, 
from a process outside
@@ -56,14 +57,17 @@ public class ExternalShuffleBlockManager {
   // Single-threaded Java executor used to perform expensive recursive 
directory deletion.
   private final Executor directoryCleaner;
 
-  public ExternalShuffleBlockManager() {
+  private final TransportConf conf;
+
+  public ExternalShuffleBlockManager(TransportConf conf) {
     // TODO: Give this thread a name.
-    this(Executors.newSingleThreadExecutor());
+    this(conf, Executors.newSingleThreadExecutor());
   }
 
   // Allows tests to have more control over when directories are cleaned up.
   @VisibleForTesting
-  ExternalShuffleBlockManager(Executor directoryCleaner) {
+  ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) {
+    this.conf = conf;
     this.executors = Maps.newConcurrentMap();
     this.directoryCleaner = directoryCleaner;
   }
@@ -167,7 +171,7 @@ public class ExternalShuffleBlockManager {
   // TODO: Support consolidated hash shuffle files
   private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo 
executor, String blockId) {
     File shuffleFile = getFile(executor.localDirs, 
executor.subDirsPerLocalDir, blockId);
-    return new FileSegmentManagedBuffer(shuffleFile, 0, shuffleFile.length());
+    return new FileSegmentManagedBuffer(conf, shuffleFile, 0, 
shuffleFile.length());
   }
 
   /**
@@ -187,6 +191,7 @@ public class ExternalShuffleBlockManager {
       long offset = in.readLong();
       long nextOffset = in.readLong();
       return new FileSegmentManagedBuffer(
+        conf,
         getFile(executor.localDirs, executor.subDirsPerLocalDir,
           "shuffle_" + shuffleId + "_" + mapId + "_0.data"),
         offset,

http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
index da54797..dad6428 100644
--- 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
+++ 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
@@ -22,6 +22,8 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 
 import com.google.common.io.CharStreams;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -37,6 +39,8 @@ public class ExternalShuffleBlockManagerSuite {
 
   static TestShuffleDataContext dataContext;
 
+  static TransportConf conf = new TransportConf(new 
SystemPropertyConfigProvider());
+
   @BeforeClass
   public static void beforeAll() throws IOException {
     dataContext = new TestShuffleDataContext(2, 5);
@@ -56,7 +60,7 @@ public class ExternalShuffleBlockManagerSuite {
 
   @Test
   public void testBadRequests() {
-    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
+    ExternalShuffleBlockManager manager = new 
ExternalShuffleBlockManager(conf);
     // Unregistered executor
     try {
       manager.getBlockData("app0", "exec1", "shuffle_1_1_0");
@@ -87,7 +91,7 @@ public class ExternalShuffleBlockManagerSuite {
 
   @Test
   public void testSortShuffleBlocks() throws IOException {
-    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
+    ExternalShuffleBlockManager manager = new 
ExternalShuffleBlockManager(conf);
     manager.registerExecutor("app0", "exec0",
       
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
 
@@ -106,7 +110,7 @@ public class ExternalShuffleBlockManagerSuite {
 
   @Test
   public void testHashShuffleBlocks() throws IOException {
-    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
+    ExternalShuffleBlockManager manager = new 
ExternalShuffleBlockManager(conf);
     manager.registerExecutor("app0", "exec0",
       
dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index c8ece3b..254e3a7 100644
--- 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -25,20 +25,23 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.Test;
-
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+
 public class ExternalShuffleCleanupSuite {
 
   // Same-thread Executor used to ensure cleanup happens synchronously in test 
thread.
   Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
+  TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
 
   @Test
   public void noCleanupAndCleanup() throws IOException {
     TestShuffleDataContext dataContext = createSomeData();
 
-    ExternalShuffleBlockManager manager = new 
ExternalShuffleBlockManager(sameThreadExecutor);
+    ExternalShuffleBlockManager manager = new 
ExternalShuffleBlockManager(conf, sameThreadExecutor);
     manager.registerExecutor("app", "exec0", 
dataContext.createExecutorInfo("shuffleMgr"));
     manager.applicationRemoved("app", false /* cleanup */);
 
@@ -61,7 +64,7 @@ public class ExternalShuffleCleanupSuite {
       @Override public void execute(Runnable runnable) { 
cleanupCalled.set(true); }
     };
 
-    ExternalShuffleBlockManager manager = new 
ExternalShuffleBlockManager(noThreadExecutor);
+    ExternalShuffleBlockManager manager = new 
ExternalShuffleBlockManager(conf, noThreadExecutor);
 
     manager.registerExecutor("app", "exec0", 
dataContext.createExecutorInfo("shuffleMgr"));
     manager.applicationRemoved("app", true);
@@ -78,7 +81,7 @@ public class ExternalShuffleCleanupSuite {
     TestShuffleDataContext dataContext0 = createSomeData();
     TestShuffleDataContext dataContext1 = createSomeData();
 
-    ExternalShuffleBlockManager manager = new 
ExternalShuffleBlockManager(sameThreadExecutor);
+    ExternalShuffleBlockManager manager = new 
ExternalShuffleBlockManager(conf, sameThreadExecutor);
 
     manager.registerExecutor("app", "exec0", 
dataContext0.createExecutorInfo("shuffleMgr"));
     manager.registerExecutor("app", "exec1", 
dataContext1.createExecutorInfo("shuffleMgr"));
@@ -93,7 +96,7 @@ public class ExternalShuffleCleanupSuite {
     TestShuffleDataContext dataContext0 = createSomeData();
     TestShuffleDataContext dataContext1 = createSomeData();
 
-    ExternalShuffleBlockManager manager = new 
ExternalShuffleBlockManager(sameThreadExecutor);
+    ExternalShuffleBlockManager manager = new 
ExternalShuffleBlockManager(conf, sameThreadExecutor);
 
     manager.registerExecutor("app-0", "exec0", 
dataContext0.createExecutorInfo("shuffleMgr"));
     manager.registerExecutor("app-1", "exec0", 
dataContext1.createExecutorInfo("shuffleMgr"));

http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 687bde5..02c10bc 100644
--- 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -92,7 +92,7 @@ public class ExternalShuffleIntegrationSuite {
     dataContext1.insertHashShuffleData(1, 0, exec1Blocks);
 
     conf = new TransportConf(new SystemPropertyConfigProvider());
-    handler = new ExternalShuffleBlockHandler();
+    handler = new ExternalShuffleBlockHandler(conf);
     TransportContext transportContext = new TransportContext(conf, handler);
     server = transportContext.createServer();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index 8afceab..759a129 100644
--- 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -42,7 +42,7 @@ public class ExternalShuffleSecuritySuite {
 
   @Before
   public void beforeEach() {
-    RpcHandler handler = new SaslRpcHandler(new ExternalShuffleBlockHandler(),
+    RpcHandler handler = new SaslRpcHandler(new 
ExternalShuffleBlockHandler(conf),
       new TestSecretKeyHolder("my-app-id", "secret"));
     TransportContext context = new TransportContext(conf, handler);
     this.server = context.createServer();

http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
----------------------------------------------------------------------
diff --git 
a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
 
b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index bb0b8f7..a34aabe 100644
--- 
a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ 
b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -95,10 +95,11 @@ public class YarnShuffleService extends AuxiliaryService {
    */
   @Override
   protected void serviceInit(Configuration conf) {
+    TransportConf transportConf = new TransportConf(new 
HadoopConfigProvider(conf));
     // If authentication is enabled, set up the shuffle server to use a
     // special RPC handler that filters out unauthenticated fetch requests
     boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, 
DEFAULT_SPARK_AUTHENTICATE);
-    RpcHandler rpcHandler = new ExternalShuffleBlockHandler();
+    RpcHandler rpcHandler = new ExternalShuffleBlockHandler(transportConf);
     if (authEnabled) {
       secretManager = new ShuffleSecretManager();
       rpcHandler = new SaslRpcHandler(rpcHandler, secretManager);
@@ -106,7 +107,6 @@ public class YarnShuffleService extends AuxiliaryService {
 
     int port = conf.getInt(
       SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
-    TransportConf transportConf = new TransportConf(new 
HadoopConfigProvider(conf));
     TransportContext transportContext = new TransportContext(transportConf, 
rpcHandler);
     shuffleServer = transportContext.createServer(port);
     String authEnabledString = authEnabled ? "enabled" : "not enabled";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to