Repository: spark
Updated Branches:
  refs/heads/branch-1.4 f8468c451 -> 82be68f10


http://git-wip-us.apache.org/repos/asf/spark/blob/82be68f1/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
new file mode 100644
index 0000000..d02f4f0
--- /dev/null
+++ 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import com.google.common.io.CharStreams;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class ExternalShuffleBlockResolverSuite {
+  static String sortBlock0 = "Hello!";
+  static String sortBlock1 = "World!";
+
+  static String hashBlock0 = "Elementary";
+  static String hashBlock1 = "Tabular";
+
+  static TestShuffleDataContext dataContext;
+
+  static TransportConf conf = new TransportConf(new 
SystemPropertyConfigProvider());
+
+  @BeforeClass
+  public static void beforeAll() throws IOException {
+    dataContext = new TestShuffleDataContext(2, 5);
+
+    dataContext.create();
+    // Write some sort and hash data.
+    dataContext.insertSortShuffleData(0, 0,
+      new byte[][] { sortBlock0.getBytes(), sortBlock1.getBytes() } );
+    dataContext.insertHashShuffleData(1, 0,
+      new byte[][] { hashBlock0.getBytes(), hashBlock1.getBytes() } );
+  }
+
+  @AfterClass
+  public static void afterAll() {
+    dataContext.cleanup();
+  }
+
+  @Test
+  public void testBadRequests() {
+    ExternalShuffleBlockResolver resolver = new 
ExternalShuffleBlockResolver(conf);
+    // Unregistered executor
+    try {
+      resolver.getBlockData("app0", "exec1", "shuffle_1_1_0");
+      fail("Should have failed");
+    } catch (RuntimeException e) {
+      assertTrue("Bad error message: " + e, e.getMessage().contains("not 
registered"));
+    }
+
+    // Invalid shuffle manager
+    resolver.registerExecutor("app0", "exec2", 
dataContext.createExecutorInfo("foobar"));
+    try {
+      resolver.getBlockData("app0", "exec2", "shuffle_1_1_0");
+      fail("Should have failed");
+    } catch (UnsupportedOperationException e) {
+      // pass
+    }
+
+    // Nonexistent shuffle block
+    resolver.registerExecutor("app0", "exec3",
+      
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
+    try {
+      resolver.getBlockData("app0", "exec3", "shuffle_1_1_0");
+      fail("Should have failed");
+    } catch (Exception e) {
+      // pass
+    }
+  }
+
+  @Test
+  public void testSortShuffleBlocks() throws IOException {
+    ExternalShuffleBlockResolver resolver = new 
ExternalShuffleBlockResolver(conf);
+    resolver.registerExecutor("app0", "exec0",
+      
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
+
+    InputStream block0Stream =
+      resolver.getBlockData("app0", "exec0", 
"shuffle_0_0_0").createInputStream();
+    String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
+    block0Stream.close();
+    assertEquals(sortBlock0, block0);
+
+    InputStream block1Stream =
+      resolver.getBlockData("app0", "exec0", 
"shuffle_0_0_1").createInputStream();
+    String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
+    block1Stream.close();
+    assertEquals(sortBlock1, block1);
+  }
+
+  @Test
+  public void testHashShuffleBlocks() throws IOException {
+    ExternalShuffleBlockResolver resolver = new 
ExternalShuffleBlockResolver(conf);
+    resolver.registerExecutor("app0", "exec0",
+      
dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));
+
+    InputStream block0Stream =
+      resolver.getBlockData("app0", "exec0", 
"shuffle_1_0_0").createInputStream();
+    String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
+    block0Stream.close();
+    assertEquals(hashBlock0, block0);
+
+    InputStream block1Stream =
+      resolver.getBlockData("app0", "exec0", 
"shuffle_1_0_1").createInputStream();
+    String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
+    block1Stream.close();
+    assertEquals(hashBlock1, block1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/82be68f1/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index 254e3a7..d9d9c1b 100644
--- 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -41,14 +41,15 @@ public class ExternalShuffleCleanupSuite {
   public void noCleanupAndCleanup() throws IOException {
     TestShuffleDataContext dataContext = createSomeData();
 
-    ExternalShuffleBlockManager manager = new 
ExternalShuffleBlockManager(conf, sameThreadExecutor);
-    manager.registerExecutor("app", "exec0", 
dataContext.createExecutorInfo("shuffleMgr"));
-    manager.applicationRemoved("app", false /* cleanup */);
+    ExternalShuffleBlockResolver resolver =
+      new ExternalShuffleBlockResolver(conf, sameThreadExecutor);
+    resolver.registerExecutor("app", "exec0", 
dataContext.createExecutorInfo("shuffleMgr"));
+    resolver.applicationRemoved("app", false /* cleanup */);
 
     assertStillThere(dataContext);
 
-    manager.registerExecutor("app", "exec1", 
dataContext.createExecutorInfo("shuffleMgr"));
-    manager.applicationRemoved("app", true /* cleanup */);
+    resolver.registerExecutor("app", "exec1", 
dataContext.createExecutorInfo("shuffleMgr"));
+    resolver.applicationRemoved("app", true /* cleanup */);
 
     assertCleanedUp(dataContext);
   }
@@ -64,7 +65,7 @@ public class ExternalShuffleCleanupSuite {
       @Override public void execute(Runnable runnable) { 
cleanupCalled.set(true); }
     };
 
-    ExternalShuffleBlockManager manager = new 
ExternalShuffleBlockManager(conf, noThreadExecutor);
+    ExternalShuffleBlockResolver manager = new 
ExternalShuffleBlockResolver(conf, noThreadExecutor);
 
     manager.registerExecutor("app", "exec0", 
dataContext.createExecutorInfo("shuffleMgr"));
     manager.applicationRemoved("app", true);
@@ -81,11 +82,12 @@ public class ExternalShuffleCleanupSuite {
     TestShuffleDataContext dataContext0 = createSomeData();
     TestShuffleDataContext dataContext1 = createSomeData();
 
-    ExternalShuffleBlockManager manager = new 
ExternalShuffleBlockManager(conf, sameThreadExecutor);
+    ExternalShuffleBlockResolver resolver =
+      new ExternalShuffleBlockResolver(conf, sameThreadExecutor);
 
-    manager.registerExecutor("app", "exec0", 
dataContext0.createExecutorInfo("shuffleMgr"));
-    manager.registerExecutor("app", "exec1", 
dataContext1.createExecutorInfo("shuffleMgr"));
-    manager.applicationRemoved("app", true);
+    resolver.registerExecutor("app", "exec0", 
dataContext0.createExecutorInfo("shuffleMgr"));
+    resolver.registerExecutor("app", "exec1", 
dataContext1.createExecutorInfo("shuffleMgr"));
+    resolver.applicationRemoved("app", true);
 
     assertCleanedUp(dataContext0);
     assertCleanedUp(dataContext1);
@@ -96,25 +98,26 @@ public class ExternalShuffleCleanupSuite {
     TestShuffleDataContext dataContext0 = createSomeData();
     TestShuffleDataContext dataContext1 = createSomeData();
 
-    ExternalShuffleBlockManager manager = new 
ExternalShuffleBlockManager(conf, sameThreadExecutor);
+    ExternalShuffleBlockResolver resolver =
+      new ExternalShuffleBlockResolver(conf, sameThreadExecutor);
 
-    manager.registerExecutor("app-0", "exec0", 
dataContext0.createExecutorInfo("shuffleMgr"));
-    manager.registerExecutor("app-1", "exec0", 
dataContext1.createExecutorInfo("shuffleMgr"));
+    resolver.registerExecutor("app-0", "exec0", 
dataContext0.createExecutorInfo("shuffleMgr"));
+    resolver.registerExecutor("app-1", "exec0", 
dataContext1.createExecutorInfo("shuffleMgr"));
 
-    manager.applicationRemoved("app-nonexistent", true);
+    resolver.applicationRemoved("app-nonexistent", true);
     assertStillThere(dataContext0);
     assertStillThere(dataContext1);
 
-    manager.applicationRemoved("app-0", true);
+    resolver.applicationRemoved("app-0", true);
     assertCleanedUp(dataContext0);
     assertStillThere(dataContext1);
 
-    manager.applicationRemoved("app-1", true);
+    resolver.applicationRemoved("app-1", true);
     assertCleanedUp(dataContext0);
     assertCleanedUp(dataContext1);
 
     // Make sure it's not an error to cleanup multiple times
-    manager.applicationRemoved("app-1", true);
+    resolver.applicationRemoved("app-1", true);
     assertCleanedUp(dataContext0);
     assertCleanedUp(dataContext1);
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/82be68f1/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index 7663911..3fdde05 100644
--- 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++ 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -29,7 +29,7 @@ import 
org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
 
 /**
  * Manages some sort- and hash-based shuffle data, including the creation
- * and cleanup of directories that can be read by the {@link 
ExternalShuffleBlockManager}.
+ * and cleanup of directories that can be read by the {@link 
ExternalShuffleBlockResolver}.
  */
 public class TestShuffleDataContext {
   public final String[] localDirs;
@@ -61,9 +61,9 @@ public class TestShuffleDataContext {
     String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0";
 
     OutputStream dataStream = new FileOutputStream(
-      ExternalShuffleBlockManager.getFile(localDirs, subDirsPerLocalDir, 
blockId + ".data"));
+      ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, 
blockId + ".data"));
     DataOutputStream indexStream = new DataOutputStream(new FileOutputStream(
-      ExternalShuffleBlockManager.getFile(localDirs, subDirsPerLocalDir, 
blockId + ".index")));
+      ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, 
blockId + ".index")));
 
     long offset = 0;
     indexStream.writeLong(offset);
@@ -82,7 +82,7 @@ public class TestShuffleDataContext {
     for (int i = 0; i < blocks.length; i ++) {
       String blockId = "shuffle_" + shuffleId + "_" + mapId + "_" + i;
       Files.write(blocks[i],
-        ExternalShuffleBlockManager.getFile(localDirs, subDirsPerLocalDir, 
blockId));
+        ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, 
blockId));
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to