Repository: spark
Updated Branches:
  refs/heads/branch-2.2 3bfad9d42 -> 40fd0ce7f


[SPARK-21342] Fix DownloadCallback to work well with RetryingBlockFetcher.

When `RetryingBlockFetcher` retries fetching blocks. There could be two 
`DownloadCallback`s download the same content to the same target file. It could 
cause `ShuffleBlockFetcherIterator` reading a partial result.

This pr proposes to create and delete the tmp files in `OneForOneBlockFetcher`

Author: jinxing <jinxing6...@126.com>
Author: Shixiong Zhu <zsxw...@gmail.com>

Closes #18565 from jinxing64/SPARK-21342.

(cherry picked from commit 6a06c4b03c4dd86241fb9d11b4360371488f0e53)
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40fd0ce7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40fd0ce7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40fd0ce7

Branch: refs/heads/branch-2.2
Commit: 40fd0ce7f2c2facb96fc5d613bc7b6e4b573d9f7
Parents: 3bfad9d
Author: jinxing <jinxing6...@126.com>
Authored: Mon Jul 10 21:06:58 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Jul 10 21:10:02 2017 +0800

----------------------------------------------------------------------
 .../network/shuffle/ExternalShuffleClient.java  |  7 ++--
 .../network/shuffle/OneForOneBlockFetcher.java  | 34 +++++++++++-------
 .../spark/network/shuffle/ShuffleClient.java    | 13 +++++--
 .../network/shuffle/TempShuffleFileManager.java | 36 ++++++++++++++++++++
 .../network/sasl/SaslIntegrationSuite.java      |  2 +-
 .../shuffle/OneForOneBlockFetcherSuite.java     |  2 +-
 .../spark/network/BlockTransferService.scala    |  8 ++---
 .../netty/NettyBlockTransferService.scala       |  9 +++--
 .../storage/ShuffleBlockFetcherIterator.scala   | 28 ++++++++++-----
 .../spark/storage/BlockManagerSuite.scala       |  5 ++-
 .../ShuffleBlockFetcherIteratorSuite.scala      | 10 +++---
 11 files changed, 108 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 269fa72..39af9d5 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.network.shuffle;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -88,15 +87,15 @@ public class ExternalShuffleClient extends ShuffleClient {
       String execId,
       String[] blockIds,
       BlockFetchingListener listener,
-      File[] shuffleFiles) {
+      TempShuffleFileManager tempShuffleFileManager) {
     checkInit();
     logger.debug("External shuffle fetch from {}:{} (executor id {})", host, 
port, execId);
     try {
       RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
           (blockIds1, listener1) -> {
             TransportClient client = clientFactory.createClient(host, port);
-            new OneForOneBlockFetcher(client, appId, execId, blockIds1, 
listener1, conf,
-              shuffleFiles).start();
+            new OneForOneBlockFetcher(client, appId, execId,
+              blockIds1, listener1, conf, tempShuffleFileManager).start();
           };
 
       int maxRetries = conf.maxIORetries();

http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
index d46ce2e..2f160d1 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
@@ -57,30 +57,36 @@ public class OneForOneBlockFetcher {
   private final String[] blockIds;
   private final BlockFetchingListener listener;
   private final ChunkReceivedCallback chunkCallback;
-  private TransportConf transportConf = null;
-  private File[] shuffleFiles = null;
+  private final TransportConf transportConf;
+  private final TempShuffleFileManager tempShuffleFileManager;
 
   private StreamHandle streamHandle = null;
 
   public OneForOneBlockFetcher(
+    TransportClient client,
+    String appId,
+    String execId,
+    String[] blockIds,
+    BlockFetchingListener listener,
+    TransportConf transportConf) {
+    this(client, appId, execId, blockIds, listener, transportConf, null);
+  }
+
+  public OneForOneBlockFetcher(
       TransportClient client,
       String appId,
       String execId,
       String[] blockIds,
       BlockFetchingListener listener,
       TransportConf transportConf,
-      File[] shuffleFiles) {
+      TempShuffleFileManager tempShuffleFileManager) {
     this.client = client;
     this.openMessage = new OpenBlocks(appId, execId, blockIds);
     this.blockIds = blockIds;
     this.listener = listener;
     this.chunkCallback = new ChunkCallback();
     this.transportConf = transportConf;
-    if (shuffleFiles != null) {
-      this.shuffleFiles = shuffleFiles;
-      assert this.shuffleFiles.length == blockIds.length:
-        "Number of shuffle files should equal to blocks";
-    }
+    this.tempShuffleFileManager = tempShuffleFileManager;
   }
 
   /** Callback invoked on receipt of each chunk. We equate a single chunk to a 
single block. */
@@ -119,9 +125,9 @@ public class OneForOneBlockFetcher {
           // Immediately request all chunks -- we expect that the total size 
of the request is
           // reasonable due to higher level chunking in 
[[ShuffleBlockFetcherIterator]].
           for (int i = 0; i < streamHandle.numChunks; i++) {
-            if (shuffleFiles != null) {
+            if (tempShuffleFileManager != null) {
               
client.stream(OneForOneStreamManager.genStreamChunkId(streamHandle.streamId, i),
-                new DownloadCallback(shuffleFiles[i], i));
+                new DownloadCallback(i));
             } else {
               client.fetchChunk(streamHandle.streamId, i, chunkCallback);
             }
@@ -157,8 +163,8 @@ public class OneForOneBlockFetcher {
     private File targetFile = null;
     private int chunkIndex;
 
-    DownloadCallback(File targetFile, int chunkIndex) throws IOException {
-      this.targetFile = targetFile;
+    DownloadCallback(int chunkIndex) throws IOException {
+      this.targetFile = tempShuffleFileManager.createTempShuffleFile();
       this.channel = Channels.newChannel(new FileOutputStream(targetFile));
       this.chunkIndex = chunkIndex;
     }
@@ -174,6 +180,9 @@ public class OneForOneBlockFetcher {
       ManagedBuffer buffer = new FileSegmentManagedBuffer(transportConf, 
targetFile, 0,
         targetFile.length());
       listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);
+      if (!tempShuffleFileManager.registerTempShuffleFileToClean(targetFile)) {
+        targetFile.delete();
+      }
     }
 
     @Override
@@ -182,6 +191,7 @@ public class OneForOneBlockFetcher {
       // On receipt of a failure, fail every block from chunkIndex onwards.
       String[] remainingBlockIds = Arrays.copyOfRange(blockIds, chunkIndex, 
blockIds.length);
       failRemainingBlocks(remainingBlockIds, cause);
+      targetFile.delete();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
index 978ff5a..9e77bee 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
@@ -18,7 +18,6 @@
 package org.apache.spark.network.shuffle;
 
 import java.io.Closeable;
-import java.io.File;
 
 /** Provides an interface for reading shuffle files, either from an Executor 
or external service. */
 public abstract class ShuffleClient implements Closeable {
@@ -35,6 +34,16 @@ public abstract class ShuffleClient implements Closeable {
    * Note that this API takes a sequence so the implementation can batch 
requests, and does not
    * return a future so the underlying implementation can invoke 
onBlockFetchSuccess as soon as
    * the data of a block is fetched, rather than waiting for all blocks to be 
fetched.
+   *
+   * @param host the host of the remote node.
+   * @param port the port of the remote node.
+   * @param execId the executor id.
+   * @param blockIds block ids to fetch.
+   * @param listener the listener to receive block fetching status.
+   * @param tempShuffleFileManager TempShuffleFileManager to create and clean 
temp shuffle files.
+   *                               If it's not <code>null</code>, the remote 
blocks will be streamed
+   *                               into temp shuffle files to reduce the 
memory usage, otherwise,
+   *                               they will be kept in memory.
    */
   public abstract void fetchBlocks(
       String host,
@@ -42,5 +51,5 @@ public abstract class ShuffleClient implements Closeable {
       String execId,
       String[] blockIds,
       BlockFetchingListener listener,
-      File[] shuffleFiles);
+      TempShuffleFileManager tempShuffleFileManager);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java
new file mode 100644
index 0000000..84a5ed6
--- /dev/null
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.File;
+
+/**
+ * A manager to create temp shuffle block files to reduce the memory usage and 
also clean temp
+ * files when they won't be used any more.
+ */
+public interface TempShuffleFileManager {
+
+  /** Create a temp shuffle block file. */
+  File createTempShuffleFile();
+
+  /**
+   * Register a temp shuffle file to clean up when it won't be used any more. 
Return whether the
+   * file is registered successfully. If `false`, the caller should clean up 
the file by itself.
+   */
+  boolean registerTempShuffleFileToClean(File file);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index 0c054fc..65cd550 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -204,7 +204,7 @@ public class SaslIntegrationSuite {
 
       String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" };
       OneForOneBlockFetcher fetcher =
-          new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener, 
conf, null);
+          new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener, 
conf);
       fetcher.start();
       blockFetchLatch.await();
       checkSecurityException(exception.get());

http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
index 61d8221..dc947a6 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
@@ -131,7 +131,7 @@ public class OneForOneBlockFetcherSuite {
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
     String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
     OneForOneBlockFetcher fetcher =
-      new OneForOneBlockFetcher(client, "app-id", "exec-id", blockIds, 
listener, conf, null);
+      new OneForOneBlockFetcher(client, "app-id", "exec-id", blockIds, 
listener, conf);
 
     // Respond to the "OpenBlocks" message with an appropriate 
ShuffleStreamHandle with streamId 123
     doAnswer(invocationOnMock -> {

http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala 
b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index 6860214..fe5fd2d 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.network
 
-import java.io.{Closeable, File}
+import java.io.Closeable
 import java.nio.ByteBuffer
 
 import scala.concurrent.{Future, Promise}
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
-import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, 
TempShuffleFileManager}
 import org.apache.spark.storage.{BlockId, StorageLevel}
 import org.apache.spark.util.ThreadUtils
 
@@ -68,7 +68,7 @@ abstract class BlockTransferService extends ShuffleClient 
with Closeable with Lo
       execId: String,
       blockIds: Array[String],
       listener: BlockFetchingListener,
-      shuffleFiles: Array[File]): Unit
+      tempShuffleFileManager: TempShuffleFileManager): Unit
 
   /**
    * Upload a single block to a remote node, available only after [[init]] is 
invoked.
@@ -101,7 +101,7 @@ abstract class BlockTransferService extends ShuffleClient 
with Closeable with Lo
           ret.flip()
           result.success(new NioManagedBuffer(ret))
         }
-      }, shuffleFiles = null)
+      }, tempShuffleFileManager = null)
     ThreadUtils.awaitResult(result.future, Duration.Inf)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index b13a9c6..30ff938 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.network.netty
 
-import java.io.File
 import java.nio.ByteBuffer
 
 import scala.collection.JavaConverters._
@@ -30,7 +29,7 @@ import org.apache.spark.network.buffer.ManagedBuffer
 import org.apache.spark.network.client.{RpcResponseCallback, 
TransportClientBootstrap, TransportClientFactory}
 import org.apache.spark.network.crypto.{AuthClientBootstrap, 
AuthServerBootstrap}
 import org.apache.spark.network.server._
-import org.apache.spark.network.shuffle.{BlockFetchingListener, 
OneForOneBlockFetcher, RetryingBlockFetcher}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, 
OneForOneBlockFetcher, RetryingBlockFetcher, TempShuffleFileManager}
 import org.apache.spark.network.shuffle.protocol.UploadBlock
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.JavaSerializer
@@ -90,14 +89,14 @@ private[spark] class NettyBlockTransferService(
       execId: String,
       blockIds: Array[String],
       listener: BlockFetchingListener,
-      shuffleFiles: Array[File]): Unit = {
+      tempShuffleFileManager: TempShuffleFileManager): Unit = {
     logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
     try {
       val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
         override def createAndStart(blockIds: Array[String], listener: 
BlockFetchingListener) {
           val client = clientFactory.createClient(host, port)
-          new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, 
listener,
-            transportConf, shuffleFiles).start()
+          new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
+            transportConf, tempShuffleFileManager).start()
         }
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index bded3a1..7e2bcf7 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -28,7 +28,7 @@ import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
 import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
-import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, 
TempShuffleFileManager}
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util.Utils
 import org.apache.spark.util.io.ChunkedByteBufferOutputStream
@@ -66,7 +66,7 @@ final class ShuffleBlockFetcherIterator(
     maxReqsInFlight: Int,
     maxReqSizeShuffleToMem: Long,
     detectCorrupt: Boolean)
-  extends Iterator[(BlockId, InputStream)] with Logging {
+  extends Iterator[(BlockId, InputStream)] with TempShuffleFileManager with 
Logging {
 
   import ShuffleBlockFetcherIterator._
 
@@ -135,7 +135,8 @@ final class ShuffleBlockFetcherIterator(
    * A set to store the files used for shuffling remote huge blocks. Files in 
this set will be
    * deleted when cleanup. This is a layer of defensiveness against disk file 
leaks.
    */
-  val shuffleFilesSet = mutable.HashSet[File]()
+  @GuardedBy("this")
+  private[this] val shuffleFilesSet = mutable.HashSet[File]()
 
   initialize()
 
@@ -149,6 +150,19 @@ final class ShuffleBlockFetcherIterator(
     currentResult = null
   }
 
+  override def createTempShuffleFile(): File = {
+    blockManager.diskBlockManager.createTempLocalBlock()._2
+  }
+
+  override def registerTempShuffleFileToClean(file: File): Boolean = 
synchronized {
+    if (isZombie) {
+      false
+    } else {
+      shuffleFilesSet += file
+      true
+    }
+  }
+
   /**
    * Mark the iterator as zombie, and release all buffers that haven't been 
deserialized yet.
    */
@@ -173,7 +187,7 @@ final class ShuffleBlockFetcherIterator(
     }
     shuffleFilesSet.foreach { file =>
       if (!file.delete()) {
-        logInfo("Failed to cleanup shuffle fetch temp file " + 
file.getAbsolutePath());
+        logWarning("Failed to cleanup shuffle fetch temp file " + 
file.getAbsolutePath())
       }
     }
   }
@@ -218,12 +232,8 @@ final class ShuffleBlockFetcherIterator(
     // already encrypted and compressed over the wire(w.r.t. the related 
configs), we can just fetch
     // the data and write it to file directly.
     if (req.size > maxReqSizeShuffleToMem) {
-      val shuffleFiles = blockIds.map { _ =>
-        blockManager.diskBlockManager.createTempLocalBlock()._2
-      }.toArray
-      shuffleFilesSet ++= shuffleFiles
       shuffleClient.fetchBlocks(address.host, address.port, 
address.executorId, blockIds.toArray,
-        blockFetchingListener, shuffleFiles)
+        blockFetchingListener, this)
     } else {
       shuffleClient.fetchBlocks(address.host, address.port, 
address.executorId, blockIds.toArray,
         blockFetchingListener, null)

http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 9d7a869..24308a2 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.storage
 
-import java.io.File
 import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
@@ -41,7 +40,7 @@ import org.apache.spark.memory.UnifiedMemoryManager
 import org.apache.spark.network.{BlockDataManager, BlockTransferService}
 import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.network.netty.NettyBlockTransferService
-import org.apache.spark.network.shuffle.BlockFetchingListener
+import org.apache.spark.network.shuffle.{BlockFetchingListener, 
TempShuffleFileManager}
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite}
@@ -1267,7 +1266,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
         execId: String,
         blockIds: Array[String],
         listener: BlockFetchingListener,
-        shuffleFiles: Array[File]): Unit = {
+        tempShuffleFileManager: TempShuffleFileManager): Unit = {
       listener.onBlockFetchSuccess("mockBlockId", new 
NioManagedBuffer(ByteBuffer.allocate(1)))
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/40fd0ce7/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 559b3fa..6a70ced 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -33,7 +33,7 @@ import org.scalatest.PrivateMethodTester
 import org.apache.spark.{SparkFunSuite, TaskContext}
 import org.apache.spark.network._
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
-import org.apache.spark.network.shuffle.BlockFetchingListener
+import org.apache.spark.network.shuffle.{BlockFetchingListener, 
TempShuffleFileManager}
 import org.apache.spark.network.util.LimitedInputStream
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util.Utils
@@ -432,12 +432,12 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     val remoteBlocks = Map[BlockId, ManagedBuffer](
       ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer())
     val transfer = mock(classOf[BlockTransferService])
-    var shuffleFiles: Array[File] = null
+    var tempShuffleFileManager: TempShuffleFileManager = null
     when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
       .thenAnswer(new Answer[Unit] {
         override def answer(invocation: InvocationOnMock): Unit = {
           val listener = 
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
-          shuffleFiles = invocation.getArguments()(5).asInstanceOf[Array[File]]
+          tempShuffleFileManager = 
invocation.getArguments()(5).asInstanceOf[TempShuffleFileManager]
           Future {
             listener.onBlockFetchSuccess(
               ShuffleBlockId(0, 0, 0).toString, remoteBlocks(ShuffleBlockId(0, 
0, 0)))
@@ -466,13 +466,13 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     fetchShuffleBlock(blocksByAddress1)
     // `maxReqSizeShuffleToMem` is 200, which is greater than the block size 
100, so don't fetch
     // shuffle block to disk.
-    assert(shuffleFiles === null)
+    assert(tempShuffleFileManager == null)
 
     val blocksByAddress2 = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
       (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 300L)).toSeq))
     fetchShuffleBlock(blocksByAddress2)
     // `maxReqSizeShuffleToMem` is 200, which is smaller than the block size 
300, so fetch
     // shuffle block to disk.
-    assert(shuffleFiles != null)
+    assert(tempShuffleFileManager != null)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to