Repository: spark
Updated Branches:
  refs/heads/branch-2.3 9ac9f36c4 -> 575fea120


[CORE] Updates to remote cache reads

Covered by tests in DistributedSuite


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/575fea12
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/575fea12
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/575fea12

Branch: refs/heads/branch-2.3
Commit: 575fea120e25249716e3f680396580c5f9e26b5b
Parents: 6d742d1
Author: Imran Rashid <iras...@cloudera.com>
Authored: Wed Aug 22 16:38:28 2018 -0500
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Thu Sep 13 09:19:56 2018 -0500

----------------------------------------------------------------------
 .../spark/network/buffer/ManagedBuffer.java     |  5 +-
 .../spark/network/shuffle/DownloadFile.java     | 47 ++++++++++
 .../network/shuffle/DownloadFileManager.java    | 36 ++++++++
 .../shuffle/DownloadFileWritableChannel.java    | 31 +++++++
 .../network/shuffle/ExternalShuffleClient.java  |  4 +-
 .../network/shuffle/OneForOneBlockFetcher.java  | 28 +++---
 .../spark/network/shuffle/ShuffleClient.java    |  4 +-
 .../network/shuffle/SimpleDownloadFile.java     | 91 ++++++++++++++++++++
 .../spark/network/shuffle/TempFileManager.java  | 36 --------
 .../spark/network/BlockTransferService.scala    |  6 +-
 .../netty/NettyBlockTransferService.scala       |  4 +-
 .../org/apache/spark/storage/BlockManager.scala | 78 ++++++++++++++---
 .../org/apache/spark/storage/DiskStore.scala    | 16 ++++
 .../storage/ShuffleBlockFetcherIterator.scala   | 21 +++--
 .../spark/storage/BlockManagerSuite.scala       |  8 +-
 .../ShuffleBlockFetcherIteratorSuite.scala      |  6 +-
 16 files changed, 329 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
 
b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
index 1861f8d..2d573f5 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
@@ -36,7 +36,10 @@ import java.nio.ByteBuffer;
  */
 public abstract class ManagedBuffer {
 
-  /** Number of bytes of the data. */
+  /**
+   * Number of bytes of the data. If this buffer will decrypt for all of the 
views into the data,
+   * this is the size of the decrypted data.
+   */
   public abstract long size();
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFile.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFile.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFile.java
new file mode 100644
index 0000000..76c2d02
--- /dev/null
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFile.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.IOException;
+
+/**
+ * A handle on the file used when fetching remote data to disk.  Used to 
ensure the lifecycle of
+ * writing the data, reading it back, and then cleaning it up is followed.  
Specific implementations
+ * may also handle encryption.  The data can be read only via 
DownloadFileWritableChannel,
+ * which ensures data is not read until after the writer is closed.
+ */
+public interface DownloadFile {
+  /**
+   * Delete the file.
+   *
+   * @return  <code>true</code> if and only if the file or directory is
+   *          successfully deleted; <code>false</code> otherwise
+   */
+  public boolean delete();
+
+  /**
+   * A channel for writing data to the file.  This special channel allows 
access to the data for
+   * reading, after the channel is closed, via {@link 
DownloadFileWritableChannel#closeAndRead()}.
+   */
+  public DownloadFileWritableChannel openForWriting() throws IOException;
+
+  /**
+   * The path of the file, intended only for debug purposes.
+   */
+  public String path();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileManager.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileManager.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileManager.java
new file mode 100644
index 0000000..c335a17
--- /dev/null
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileManager.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * A manager to create temp block files used when fetching remote data to 
reduce the memory usage.
+ * It will clean files when they won't be used any more.
+ */
+public interface DownloadFileManager {
+
+  /** Create a temp block file. */
+  DownloadFile createTempFile(TransportConf transportConf);
+
+  /**
+   * Register a temp file to clean up when it won't be used any more. Return 
whether the
+   * file is registered successfully. If `false`, the caller should clean up 
the file by itself.
+   */
+  boolean registerTempFileToClean(DownloadFile file);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileWritableChannel.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileWritableChannel.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileWritableChannel.java
new file mode 100644
index 0000000..acf194c
--- /dev/null
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileWritableChannel.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+import java.io.OutputStream;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A channel for writing data which is fetched to disk, which allows access to 
the written data only
+ * after the writer has been closed.  Used with DownloadFile and 
DownloadFileManager.
+ */
+public interface DownloadFileWritableChannel extends WritableByteChannel {
+  public ManagedBuffer closeAndRead();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 7ed0b6e..9a2cf0f 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -91,7 +91,7 @@ public class ExternalShuffleClient extends ShuffleClient {
       String execId,
       String[] blockIds,
       BlockFetchingListener listener,
-      TempFileManager tempFileManager) {
+      DownloadFileManager downloadFileManager) {
     checkInit();
     logger.debug("External shuffle fetch from {}:{} (executor id {})", host, 
port, execId);
     try {
@@ -99,7 +99,7 @@ public class ExternalShuffleClient extends ShuffleClient {
           (blockIds1, listener1) -> {
             TransportClient client = clientFactory.createClient(host, port);
             new OneForOneBlockFetcher(client, appId, execId,
-              blockIds1, listener1, conf, tempFileManager).start();
+              blockIds1, listener1, conf, downloadFileManager).start();
           };
 
       int maxRetries = conf.maxIORetries();

http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
index 0bc5718..3058702 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
@@ -17,18 +17,13 @@
 
 package org.apache.spark.network.shuffle;
 
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
 import java.util.Arrays;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
 import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.client.ChunkReceivedCallback;
 import org.apache.spark.network.client.RpcResponseCallback;
@@ -58,7 +53,7 @@ public class OneForOneBlockFetcher {
   private final BlockFetchingListener listener;
   private final ChunkReceivedCallback chunkCallback;
   private final TransportConf transportConf;
-  private final TempFileManager tempFileManager;
+  private final DownloadFileManager downloadFileManager;
 
   private StreamHandle streamHandle = null;
 
@@ -79,14 +74,14 @@ public class OneForOneBlockFetcher {
       String[] blockIds,
       BlockFetchingListener listener,
       TransportConf transportConf,
-      TempFileManager tempFileManager) {
+      DownloadFileManager downloadFileManager) {
     this.client = client;
     this.openMessage = new OpenBlocks(appId, execId, blockIds);
     this.blockIds = blockIds;
     this.listener = listener;
     this.chunkCallback = new ChunkCallback();
     this.transportConf = transportConf;
-    this.tempFileManager = tempFileManager;
+    this.downloadFileManager = downloadFileManager;
   }
 
   /** Callback invoked on receipt of each chunk. We equate a single chunk to a 
single block. */
@@ -125,7 +120,7 @@ public class OneForOneBlockFetcher {
           // Immediately request all chunks -- we expect that the total size 
of the request is
           // reasonable due to higher level chunking in 
[[ShuffleBlockFetcherIterator]].
           for (int i = 0; i < streamHandle.numChunks; i++) {
-            if (tempFileManager != null) {
+            if (downloadFileManager != null) {
               
client.stream(OneForOneStreamManager.genStreamChunkId(streamHandle.streamId, i),
                 new DownloadCallback(i));
             } else {
@@ -159,13 +154,13 @@ public class OneForOneBlockFetcher {
 
   private class DownloadCallback implements StreamCallback {
 
-    private WritableByteChannel channel = null;
-    private File targetFile = null;
+    private DownloadFileWritableChannel channel = null;
+    private DownloadFile targetFile = null;
     private int chunkIndex;
 
     DownloadCallback(int chunkIndex) throws IOException {
-      this.targetFile = tempFileManager.createTempFile();
-      this.channel = Channels.newChannel(new FileOutputStream(targetFile));
+      this.targetFile = downloadFileManager.createTempFile(transportConf);
+      this.channel = targetFile.openForWriting();
       this.chunkIndex = chunkIndex;
     }
 
@@ -178,11 +173,8 @@ public class OneForOneBlockFetcher {
 
     @Override
     public void onComplete(String streamId) throws IOException {
-      channel.close();
-      ManagedBuffer buffer = new FileSegmentManagedBuffer(transportConf, 
targetFile, 0,
-        targetFile.length());
-      listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);
-      if (!tempFileManager.registerTempFileToClean(targetFile)) {
+      listener.onBlockFetchSuccess(blockIds[chunkIndex], 
channel.closeAndRead());
+      if (!downloadFileManager.registerTempFileToClean(targetFile)) {
         targetFile.delete();
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
index 18b04fe..62b99c4 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
@@ -43,7 +43,7 @@ public abstract class ShuffleClient implements Closeable {
    * @param execId the executor id.
    * @param blockIds block ids to fetch.
    * @param listener the listener to receive block fetching status.
-   * @param tempFileManager TempFileManager to create and clean temp files.
+   * @param downloadFileManager DownloadFileManager to create and clean temp 
files.
    *                        If it's not <code>null</code>, the remote blocks 
will be streamed
    *                        into temp shuffle files to reduce the memory 
usage, otherwise,
    *                        they will be kept in memory.
@@ -54,7 +54,7 @@ public abstract class ShuffleClient implements Closeable {
       String execId,
       String[] blockIds,
       BlockFetchingListener listener,
-      TempFileManager tempFileManager);
+      DownloadFileManager downloadFileManager);
 
   /**
    * Get the shuffle MetricsSet from ShuffleClient, this will be used in 
MetricsSystem to

http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java
new file mode 100644
index 0000000..670612f
--- /dev/null
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.shuffle;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * A DownloadFile that does not take any encryption settings into account for 
reading and
+ * writing data.
+ *
+ * This does *not* mean the data in the file is un-encrypted -- it could be 
that the data is
+ * already encrypted when its written, and subsequent layer is responsible for 
decrypting.
+ */
+public class SimpleDownloadFile implements DownloadFile {
+
+  private final File file;
+  private final TransportConf transportConf;
+
+  public SimpleDownloadFile(File file, TransportConf transportConf) {
+    this.file = file;
+    this.transportConf = transportConf;
+  }
+
+  @Override
+  public boolean delete() {
+    return file.delete();
+  }
+
+  @Override
+  public DownloadFileWritableChannel openForWriting() throws IOException {
+    return new SimpleDownloadWritableChannel();
+  }
+
+  @Override
+  public String path() {
+    return file.getAbsolutePath();
+  }
+
+  private class SimpleDownloadWritableChannel implements 
DownloadFileWritableChannel {
+
+    private final WritableByteChannel channel;
+
+    SimpleDownloadWritableChannel() throws FileNotFoundException {
+      channel = Channels.newChannel(new FileOutputStream(file));
+    }
+
+    @Override
+    public ManagedBuffer closeAndRead() {
+      return new FileSegmentManagedBuffer(transportConf, file, 0, 
file.length());
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      return channel.write(src);
+    }
+
+    @Override
+    public boolean isOpen() {
+      return channel.isOpen();
+    }
+
+    @Override
+    public void close() throws IOException {
+      channel.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java
deleted file mode 100644
index 552364d..0000000
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.io.File;
-
-/**
- * A manager to create temp block files to reduce the memory usage and also 
clean temp
- * files when they won't be used any more.
- */
-public interface TempFileManager {
-
-  /** Create a temp block file. */
-  File createTempFile();
-
-  /**
-   * Register a temp file to clean up when it won't be used any more. Return 
whether the
-   * file is registered successfully. If `false`, the caller should clean up 
the file by itself.
-   */
-  boolean registerTempFileToClean(File file);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala 
b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index 1d8a266..eef8c31 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer, NioManagedBuffer}
-import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, 
TempFileManager}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, 
DownloadFileManager, ShuffleClient}
 import org.apache.spark.storage.{BlockId, StorageLevel}
 import org.apache.spark.util.ThreadUtils
 
@@ -68,7 +68,7 @@ abstract class BlockTransferService extends ShuffleClient 
with Closeable with Lo
       execId: String,
       blockIds: Array[String],
       listener: BlockFetchingListener,
-      tempFileManager: TempFileManager): Unit
+      tempFileManager: DownloadFileManager): Unit
 
   /**
    * Upload a single block to a remote node, available only after [[init]] is 
invoked.
@@ -92,7 +92,7 @@ abstract class BlockTransferService extends ShuffleClient 
with Closeable with Lo
       port: Int,
       execId: String,
       blockId: String,
-      tempFileManager: TempFileManager): ManagedBuffer = {
+      tempFileManager: DownloadFileManager): ManagedBuffer = {
     // A monitor for the thread to wait on.
     val result = Promise[ManagedBuffer]()
     fetchBlocks(host, port, execId, Array(blockId),

http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index b7d8c35..1c48ad6 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -32,7 +32,7 @@ import org.apache.spark.network.buffer.ManagedBuffer
 import org.apache.spark.network.client.{RpcResponseCallback, 
TransportClientBootstrap, TransportClientFactory}
 import org.apache.spark.network.crypto.{AuthClientBootstrap, 
AuthServerBootstrap}
 import org.apache.spark.network.server._
-import org.apache.spark.network.shuffle.{BlockFetchingListener, 
OneForOneBlockFetcher, RetryingBlockFetcher, TempFileManager}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, 
DownloadFileManager, OneForOneBlockFetcher, RetryingBlockFetcher}
 import org.apache.spark.network.shuffle.protocol.UploadBlock
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.JavaSerializer
@@ -105,7 +105,7 @@ private[spark] class NettyBlockTransferService(
       execId: String,
       blockIds: Array[String],
       listener: BlockFetchingListener,
-      tempFileManager: TempFileManager): Unit = {
+      tempFileManager: DownloadFileManager): Unit = {
     logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
     try {
       val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {

http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index df1a4be..027c31f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -33,6 +33,7 @@ import scala.util.Random
 import scala.util.control.NonFatal
 
 import com.codahale.metrics.{MetricRegistry, MetricSet}
+import com.google.common.io.CountingOutputStream
 
 import org.apache.spark._
 import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
@@ -42,8 +43,9 @@ import org.apache.spark.metrics.source.Source
 import org.apache.spark.network._
 import org.apache.spark.network.buffer.ManagedBuffer
 import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.network.shuffle.{ExternalShuffleClient, 
TempFileManager}
+import org.apache.spark.network.shuffle._
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
+import org.apache.spark.network.util.TransportConf
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
 import org.apache.spark.shuffle.ShuffleManager
@@ -206,11 +208,11 @@ private[spark] class BlockManager(
 
   private var blockReplicationPolicy: BlockReplicationPolicy = _
 
-  // A TempFileManager used to track all the files of remote blocks which 
above the
+  // A DownloadFileManager used to track all the files of remote blocks which 
are above the
   // specified memory threshold. Files will be deleted automatically based on 
weak reference.
   // Exposed for test
   private[storage] val remoteBlockTempFileManager =
-    new BlockManager.RemoteBlockTempFileManager(this)
+    new BlockManager.RemoteBlockDownloadFileManager(this)
   private val maxRemoteBlockToMem = 
conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
 
   /**
@@ -1582,23 +1584,28 @@ private[spark] object BlockManager {
     metricRegistry.registerAll(metricSet)
   }
 
-  class RemoteBlockTempFileManager(blockManager: BlockManager)
-      extends TempFileManager with Logging {
+  class RemoteBlockDownloadFileManager(blockManager: BlockManager)
+      extends DownloadFileManager with Logging {
+    // lazy because SparkEnv is set after this
+    lazy val encryptionKey = SparkEnv.get.securityManager.getIOEncryptionKey()
 
-    private class ReferenceWithCleanup(file: File, referenceQueue: 
JReferenceQueue[File])
-        extends WeakReference[File](file, referenceQueue) {
-      private val filePath = file.getAbsolutePath
+    private class ReferenceWithCleanup(
+        file: DownloadFile,
+        referenceQueue: JReferenceQueue[DownloadFile]
+        ) extends WeakReference[DownloadFile](file, referenceQueue) {
+
+      val filePath = file.path()
 
       def cleanUp(): Unit = {
         logDebug(s"Clean up file $filePath")
 
-        if (!new File(filePath).delete()) {
+        if (!file.delete()) {
           logDebug(s"Fail to delete file $filePath")
         }
       }
     }
 
-    private val referenceQueue = new JReferenceQueue[File]
+    private val referenceQueue = new JReferenceQueue[DownloadFile]
     private val referenceBuffer = 
Collections.newSetFromMap[ReferenceWithCleanup](
       new ConcurrentHashMap)
 
@@ -1610,11 +1617,21 @@ private[spark] object BlockManager {
     cleaningThread.setName("RemoteBlock-temp-file-clean-thread")
     cleaningThread.start()
 
-    override def createTempFile(): File = {
-      blockManager.diskBlockManager.createTempLocalBlock()._2
+    override def createTempFile(transportConf: TransportConf): DownloadFile = {
+      val file = blockManager.diskBlockManager.createTempLocalBlock()._2
+      encryptionKey match {
+        case Some(key) =>
+          // encryption is enabled, so when we read the decrypted data off the 
network, we need to
+          // encrypt it when writing to disk.  Note that the data may have 
been encrypted when it
+          // was cached on disk on the remote side, but it was already 
decrypted by now (see
+          // EncryptedBlockData).
+          new EncryptedDownloadFile(file, key)
+        case None =>
+          new SimpleDownloadFile(file, transportConf)
+      }
     }
 
-    override def registerTempFileToClean(file: File): Boolean = {
+    override def registerTempFileToClean(file: DownloadFile): Boolean = {
       referenceBuffer.add(new ReferenceWithCleanup(file, referenceQueue))
     }
 
@@ -1642,4 +1659,39 @@ private[spark] object BlockManager {
       }
     }
   }
+
+  /**
+   * A DownloadFile that encrypts data when it is written, and decrypts when 
it's read.
+   */
+  private class EncryptedDownloadFile(
+      file: File,
+      key: Array[Byte]) extends DownloadFile {
+
+    private val env = SparkEnv.get
+
+    override def delete(): Boolean = file.delete()
+
+    override def openForWriting(): DownloadFileWritableChannel = {
+      new EncryptedDownloadWritableChannel()
+    }
+
+    override def path(): String = file.getAbsolutePath
+
+    private class EncryptedDownloadWritableChannel extends 
DownloadFileWritableChannel {
+      private val countingOutput: CountingWritableChannel = new 
CountingWritableChannel(
+        Channels.newChannel(env.serializerManager.wrapForEncryption(new 
FileOutputStream(file))))
+
+      override def closeAndRead(): ManagedBuffer = {
+        countingOutput.close()
+        val size = countingOutput.getCount
+        new EncryptedManagedBuffer(new EncryptedBlockData(file, size, 
env.conf, key))
+      }
+
+      override def write(src: ByteBuffer): Int = countingOutput.write(src)
+
+      override def isOpen: Boolean = countingOutput.isOpen()
+
+      override def close(): Unit = countingOutput.close()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 39249d4..e61cfe8 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -30,6 +30,7 @@ import io.netty.channel.DefaultFileRegion
 
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.Logging
+import org.apache.spark.network.buffer.ManagedBuffer
 import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils}
 import org.apache.spark.security.CryptoStreamUtils
 import org.apache.spark.util.Utils
@@ -261,7 +262,22 @@ private class EncryptedBlockData(
         throw e
     }
   }
+}
+
+private class EncryptedManagedBuffer(val blockData: EncryptedBlockData) 
extends ManagedBuffer {
+
+  // This is the size of the decrypted data
+  override def size(): Long = blockData.size
+
+  override def nioByteBuffer(): ByteBuffer = blockData.toByteBuffer()
+
+  override def convertToNetty(): AnyRef = blockData.toNetty()
+
+  override def createInputStream(): InputStream = blockData.toInputStream()
+
+  override def retain(): ManagedBuffer = this
 
+  override def release(): ManagedBuffer = this
 }
 
 private class ReadableChannelFileRegion(source: ReadableByteChannel, 
blockSize: Long)

http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index dd9df74..94def4d 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.storage
 
-import java.io.{File, InputStream, IOException}
+import java.io.{InputStream, IOException}
 import java.nio.ByteBuffer
 import java.util.concurrent.LinkedBlockingQueue
 import javax.annotation.concurrent.GuardedBy
@@ -28,7 +28,8 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, 
HashSet, Queue}
 import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
-import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, 
TempFileManager}
+import org.apache.spark.network.shuffle._
+import org.apache.spark.network.util.TransportConf
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util.Utils
 import org.apache.spark.util.io.ChunkedByteBufferOutputStream
@@ -69,7 +70,7 @@ final class ShuffleBlockFetcherIterator(
     maxBlocksInFlightPerAddress: Int,
     maxReqSizeShuffleToMem: Long,
     detectCorrupt: Boolean)
-  extends Iterator[(BlockId, InputStream)] with TempFileManager with Logging {
+  extends Iterator[(BlockId, InputStream)] with DownloadFileManager with 
Logging {
 
   import ShuffleBlockFetcherIterator._
 
@@ -148,7 +149,7 @@ final class ShuffleBlockFetcherIterator(
    * deleted when cleanup. This is a layer of defensiveness against disk file 
leaks.
    */
   @GuardedBy("this")
-  private[this] val shuffleFilesSet = mutable.HashSet[File]()
+  private[this] val shuffleFilesSet = mutable.HashSet[DownloadFile]()
 
   initialize()
 
@@ -162,11 +163,15 @@ final class ShuffleBlockFetcherIterator(
     currentResult = null
   }
 
-  override def createTempFile(): File = {
-    blockManager.diskBlockManager.createTempLocalBlock()._2
+  override def createTempFile(transportConf: TransportConf): DownloadFile = {
+    // we never need to do any encryption or decryption here, regardless of 
configs, because that
+    // is handled at another layer in the code.  When encryption is enabled, 
shuffle data is written
+    // to disk encrypted in the first place, and sent over the network still 
encrypted.
+    new SimpleDownloadFile(
+      blockManager.diskBlockManager.createTempLocalBlock()._2, transportConf)
   }
 
-  override def registerTempFileToClean(file: File): Boolean = synchronized {
+  override def registerTempFileToClean(file: DownloadFile): Boolean = 
synchronized {
     if (isZombie) {
       false
     } else {
@@ -202,7 +207,7 @@ final class ShuffleBlockFetcherIterator(
     }
     shuffleFilesSet.foreach { file =>
       if (!file.delete()) {
-        logWarning("Failed to cleanup shuffle fetch temp file " + 
file.getAbsolutePath())
+        logWarning("Failed to cleanup shuffle fetch temp file " + file.path())
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 629eed4..4d2168f 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -44,7 +44,7 @@ import org.apache.spark.network.buffer.{ManagedBuffer, 
NioManagedBuffer}
 import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
 import org.apache.spark.network.netty.{NettyBlockTransferService, 
SparkTransportConf}
 import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, 
TransportServerBootstrap}
-import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, 
TempFileManager}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, 
DownloadFileManager}
 import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, 
RegisterExecutor}
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
@@ -1403,7 +1403,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 
   class MockBlockTransferService(val maxFailures: Int) extends 
BlockTransferService {
     var numCalls = 0
-    var tempFileManager: TempFileManager = null
+    var tempFileManager: DownloadFileManager = null
 
     override def init(blockDataManager: BlockDataManager): Unit = {}
 
@@ -1413,7 +1413,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
         execId: String,
         blockIds: Array[String],
         listener: BlockFetchingListener,
-        tempFileManager: TempFileManager): Unit = {
+        tempFileManager: DownloadFileManager): Unit = {
       listener.onBlockFetchSuccess("mockBlockId", new 
NioManagedBuffer(ByteBuffer.allocate(1)))
     }
 
@@ -1440,7 +1440,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
         port: Int,
         execId: String,
         blockId: String,
-        tempFileManager: TempFileManager): ManagedBuffer = {
+        tempFileManager: DownloadFileManager): ManagedBuffer = {
       numCalls += 1
       this.tempFileManager = tempFileManager
       if (numCalls <= maxFailures) {

http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 692ae3b..24244f9 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -33,7 +33,7 @@ import org.scalatest.PrivateMethodTester
 import org.apache.spark.{SparkFunSuite, TaskContext}
 import org.apache.spark.network._
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
-import org.apache.spark.network.shuffle.{BlockFetchingListener, 
TempFileManager}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, 
DownloadFileManager}
 import org.apache.spark.network.util.LimitedInputStream
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util.Utils
@@ -482,12 +482,12 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     val remoteBlocks = Map[BlockId, ManagedBuffer](
       ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer())
     val transfer = mock(classOf[BlockTransferService])
-    var tempFileManager: TempFileManager = null
+    var tempFileManager: DownloadFileManager = null
     when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
       .thenAnswer(new Answer[Unit] {
         override def answer(invocation: InvocationOnMock): Unit = {
           val listener = 
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
-          tempFileManager = 
invocation.getArguments()(5).asInstanceOf[TempFileManager]
+          tempFileManager = 
invocation.getArguments()(5).asInstanceOf[DownloadFileManager]
           Future {
             listener.onBlockFetchSuccess(
               ShuffleBlockId(0, 0, 0).toString, remoteBlocks(ShuffleBlockId(0, 
0, 0)))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to