Repository: giraph
Updated Branches:
  refs/heads/trunk 47da75182 -> 5b0cd0e0a


http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java 
b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index b9fc508..8d8e86d 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -32,7 +32,6 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -57,9 +56,9 @@ import org.apache.giraph.io.VertexInputFormat;
 import 
org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.job.HadoopUtils;
+import org.apache.giraph.master.input.LocalityAwareInputSplitsMasterOrganizer;
 import org.apache.giraph.utils.NoOpComputation;
-import org.apache.giraph.worker.InputSplitPathOrganizer;
-import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -69,7 +68,6 @@ import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.zookeeper.KeeperException;
@@ -316,38 +314,39 @@ public class
    * @throws InterruptedException
    */
   @Test
-  public void testInputSplitPathOrganizer()
+  public void testInputSplitLocality()
     throws IOException, KeeperException, InterruptedException {
-    final List<String> testList = new ArrayList<String>();
-    Collections.addAll(testList, "remote2", "local", "remote1");
-    final String localHost = "node.LOCAL.com";
-    final String testListName = "test_list_parent_znode";
-    // build output just as we do to store hostlists in ZNODES
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutputStream dos = new DataOutputStream(baos);
-    String last = "node.test4.com\tnode.test5.com\tnode.test6.com";
-    Text.writeString(dos, last);
-    byte[] remote1 = baos.toByteArray();
-    baos = new ByteArrayOutputStream();
-    dos = new DataOutputStream(baos);
-    String middle = "node.test1.com\tnode.test2.com\tnode.test3.com";
-    Text.writeString(dos, middle);
-    byte[] remote2 = baos.toByteArray();
-    baos = new ByteArrayOutputStream();
-    dos = new DataOutputStream(baos);
-    String first = "node.testx.com\tnode.LOCAL.com\tnode.testy.com";
-    Text.writeString(dos, first);
-    byte[] local = baos.toByteArray();
-    ZooKeeperExt zk = mock(ZooKeeperExt.class);
-    when(zk.getChildrenExt(testListName, false, false, true)).
-        thenReturn(testList);
-    when(zk.getData("remote1", false, null)).thenReturn(remote1);
-    when(zk.getData("remote2", false, null)).thenReturn(remote2);
-    when(zk.getData("local", false, null)).thenReturn(local);
-    InputSplitPathOrganizer lis =
-      new InputSplitPathOrganizer(zk, testListName, localHost, true);
-    final List<String> resultList = Lists.newArrayList(lis.getPathList());
-    assertEquals("local", resultList.get(0));
+    List<byte[]> serializedSplits = new ArrayList<>();
+    serializedSplits.add(new byte[]{1});
+    serializedSplits.add(new byte[]{2});
+    serializedSplits.add(new byte[]{3});
+
+    WorkerInfo workerInfo = mock(WorkerInfo.class);
+    when(workerInfo.getTaskId()).thenReturn(5);
+    when(workerInfo.getHostname()).thenReturn("node.LOCAL.com");
+
+    List<InputSplit> splits = new ArrayList<>();
+    InputSplit split1 = mock(InputSplit.class);
+    when(split1.getLocations()).thenReturn(new String[]{
+        "node.test1.com", "node.test2.com", "node.test3.com"});
+    splits.add(split1);
+    InputSplit split2 = mock(InputSplit.class);
+    when(split2.getLocations()).thenReturn(new String[]{
+        "node.testx.com", "node.LOCAL.com", "node.testy.com"});
+    splits.add(split2);
+    InputSplit split3 = mock(InputSplit.class);
+    when(split3.getLocations()).thenReturn(new String[]{
+        "node.test4.com", "node.test5.com", "node.test6.com"});
+    splits.add(split3);
+
+    LocalityAwareInputSplitsMasterOrganizer inputSplitOrganizer =
+        new LocalityAwareInputSplitsMasterOrganizer(
+            serializedSplits,
+            splits,
+            Lists.newArrayList(workerInfo));
+
+    assertEquals(2,
+        inputSplitOrganizer.getSerializedSplitFor(workerInfo.getTaskId())[0]);
   }
 
   /**

Reply via email to