Repository: giraph Updated Branches: refs/heads/trunk 47da75182 -> 5b0cd0e0a
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java index b9fc508..8d8e86d 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java +++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java @@ -32,7 +32,6 @@ import java.io.IOException; import java.io.InputStreamReader; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -57,9 +56,9 @@ import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat; import org.apache.giraph.job.GiraphJob; import org.apache.giraph.job.HadoopUtils; +import org.apache.giraph.master.input.LocalityAwareInputSplitsMasterOrganizer; import org.apache.giraph.utils.NoOpComputation; -import org.apache.giraph.worker.InputSplitPathOrganizer; -import org.apache.giraph.zk.ZooKeeperExt; +import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -69,7 +68,6 @@ import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.zookeeper.KeeperException; @@ -316,38 +314,39 @@ public class * @throws InterruptedException */ @Test - public void testInputSplitPathOrganizer() + public void testInputSplitLocality() throws IOException, KeeperException, InterruptedException { - final List<String> testList = new ArrayList<String>(); - Collections.addAll(testList, "remote2", "local", "remote1"); - final String localHost = "node.LOCAL.com"; - final String testListName = "test_list_parent_znode"; - // build output just as we do to store hostlists in ZNODES - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos); - String last = "node.test4.com\tnode.test5.com\tnode.test6.com"; - Text.writeString(dos, last); - byte[] remote1 = baos.toByteArray(); - baos = new ByteArrayOutputStream(); - dos = new DataOutputStream(baos); - String middle = "node.test1.com\tnode.test2.com\tnode.test3.com"; - Text.writeString(dos, middle); - byte[] remote2 = baos.toByteArray(); - baos = new ByteArrayOutputStream(); - dos = new DataOutputStream(baos); - String first = "node.testx.com\tnode.LOCAL.com\tnode.testy.com"; - Text.writeString(dos, first); - byte[] local = baos.toByteArray(); - ZooKeeperExt zk = mock(ZooKeeperExt.class); - when(zk.getChildrenExt(testListName, false, false, true)). - thenReturn(testList); - when(zk.getData("remote1", false, null)).thenReturn(remote1); - when(zk.getData("remote2", false, null)).thenReturn(remote2); - when(zk.getData("local", false, null)).thenReturn(local); - InputSplitPathOrganizer lis = - new InputSplitPathOrganizer(zk, testListName, localHost, true); - final List<String> resultList = Lists.newArrayList(lis.getPathList()); - assertEquals("local", resultList.get(0)); + List<byte[]> serializedSplits = new ArrayList<>(); + serializedSplits.add(new byte[]{1}); + serializedSplits.add(new byte[]{2}); + serializedSplits.add(new byte[]{3}); + + WorkerInfo workerInfo = mock(WorkerInfo.class); + when(workerInfo.getTaskId()).thenReturn(5); + when(workerInfo.getHostname()).thenReturn("node.LOCAL.com"); + + List<InputSplit> splits = new ArrayList<>(); + InputSplit split1 = mock(InputSplit.class); + when(split1.getLocations()).thenReturn(new String[]{ + "node.test1.com", "node.test2.com", "node.test3.com"}); + splits.add(split1); + InputSplit split2 = mock(InputSplit.class); + when(split2.getLocations()).thenReturn(new String[]{ + "node.testx.com", "node.LOCAL.com", "node.testy.com"}); + splits.add(split2); + InputSplit split3 = mock(InputSplit.class); + when(split3.getLocations()).thenReturn(new String[]{ + "node.test4.com", "node.test5.com", "node.test6.com"}); + splits.add(split3); + + LocalityAwareInputSplitsMasterOrganizer inputSplitOrganizer = + new LocalityAwareInputSplitsMasterOrganizer( + serializedSplits, + splits, + Lists.newArrayList(workerInfo)); + + assertEquals(2, + inputSplitOrganizer.getSerializedSplitFor(workerInfo.getTaskId())[0]); } /**
