Author: tucu Date: Tue Mar 20 18:37:01 2012 New Revision: 1303077 URL: http://svn.apache.org/viewvc?rev=1303077&view=rev Log: MAPREDUCE-1740. NPE in getMatchingLevelForNodes when node locations are variable depth (ahmed via tucu)
Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobInProgress.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1303077&r1=1303076&r2=1303077&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Tue Mar 20 18:37:01 2012 @@ -98,6 +98,9 @@ Release 1.1.0 - unreleased HADOOP-5836. Bug in S3N handling of directory markers using an object with a trailing "/" causes jobs to fail. (Jagane Sundar, Ian Nowland via suresh) + MAPREDUCE-1740. NPE in getMatchingLevelForNodes when node locations are + variable depth (ahmed via tucu) + IMPROVEMENTS MAPREDUCE-3597. [Rumen] Provide a way to access other info of history file Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1303077&r1=1303076&r2=1303077&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue Mar 20 18:37:01 2012 @@ -1689,16 +1689,37 @@ public class JobInProgress { // returns the (cache)level at which the nodes matches private int getMatchingLevelForNodes(Node n1, Node n2) { + return getMatchingLevelForNodes(n1, n2, this.maxLevel); + } + + static int getMatchingLevelForNodes(Node n1, Node n2, int maxLevel) { int count = 0; + + // In the case that the two nodes are at different levels in the + // node heirarchy, walk upwards on the deeper one until the + // levels are equal. Each of these counts as "distance" since it + // assumedly is going through another rack. + int level1 = n1.getLevel(), level2 = n2.getLevel(); + while (n1 != null && level1 > level2) { + n1 = n1.getParent(); + level1--; + count++; + } + while (n2 != null && level2 > level1) { + n2 = n2.getParent(); + level2--; + count++; + } + do { - if (n1.equals(n2)) { - return count; + if (n1.equals(n2) || count >= maxLevel) { + return Math.min(count, maxLevel); } ++count; n1 = n1.getParent(); n2 = n2.getParent(); } while (n1 != null); - return this.maxLevel; + return maxLevel; } /** Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobInProgress.java?rev=1303077&r1=1303076&r2=1303077&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobInProgress.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobInProgress.java Tue Mar 20 18:37:01 2012 @@ -10,7 +10,6 @@ import java.util.HashSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.examples.RandomWriter; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -20,17 +19,34 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.UtilsForTests; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; +import org.apache.hadoop.net.StaticMapping; -import junit.framework.TestCase; +import static org.junit.Assert.*; +import org.junit.Test; +import org.junit.BeforeClass; +import org.junit.AfterClass; -public class TestJobInProgress extends TestCase { +public class TestJobInProgress { static final Log LOG = LogFactory.getLog(TestJobInProgress.class); - private MiniMRCluster mrCluster; + private static MiniMRCluster mrCluster; + + private static MiniDFSCluster dfsCluster; + private static JobTracker jt; + + static final String trackers[] = new String[] { + "tracker_tracker1.r1.com:1000", "tracker_tracker2.r1.com:1000", + "tracker_tracker3.r2.com:1000", "tracker_tracker4.r3.com:1000" }; + + static final String[] hosts = new String[] { "tracker1.r1.com", + "tracker2.r1.com", "tracker3.r2.com", "tracker4.r3.com" }; + + static final String[] racks = new String[] { "/r1", "/r1", "/r2", "/r3" }; - private MiniDFSCluster dfsCluster; - JobTracker jt; private static Path TEST_DIR = new Path(System.getProperty("test.build.data","/tmp"), "jip-testing"); private static int numSlaves = 4; @@ -72,22 +88,33 @@ public class TestJobInProgress extends T } - @Override - protected void setUp() throws Exception { - // TODO Auto-generated method stub - super.setUp(); + @BeforeClass + public static void setUp() throws Exception { Configuration conf = new Configuration(); + conf.set("mapreduce.jobtracker.address", "localhost:0"); + conf.set("mapreduce.jobtracker.http.address", "0.0.0.0:0"); + conf.setClass("topology.node.switch.mapping.impl", StaticMapping.class, + DNSToSwitchMapping.class); dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null); mrCluster = new MiniMRCluster(numSlaves, dfsCluster.getFileSystem() .getUri().toString(), 1); jt = mrCluster.getJobTrackerRunner().getJobTracker(); + // Set up the Topology Information + for (int i = 0; i < hosts.length; i++) { + StaticMapping.addNodeToRack(hosts[i], racks[i]); + } + for (String s : trackers) { + FakeObjectUtilities.establishFirstContact(jt, s); + } } + @Test public void testPendingMapTaskCount() throws Exception { launchTask(FailMapTaskJob.class, IdentityReducer.class); checkTaskCounts(); } + @Test public void testPendingReduceTaskCount() throws Exception { launchTask(IdentityMapper.class, FailReduceTaskJob.class); checkTaskCounts(); @@ -113,7 +140,7 @@ public class TestJobInProgress extends T job.set(UtilsForTests.getTaskSignalParameter(true), mapSignalFile.toString()); job.set(UtilsForTests.getTaskSignalParameter(false), redSignalFile.toString()); - // Disable slow-start for reduces since this maps don't complete + // Disable slow-start for reduces since this maps don't complete // in these test-cases... job.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f); @@ -174,6 +201,7 @@ public class TestJobInProgress extends T } } + @Test public void testRunningTaskCount() throws Exception { // test with spec = false and locality=true testRunningTaskCount(false, true); @@ -188,11 +216,40 @@ public class TestJobInProgress extends T testRunningTaskCount(true, false); } - @Override - protected void tearDown() throws Exception { + @Test + public void testLocality() throws Exception { + NetworkTopology nt = new NetworkTopology(); + + Node r1n1 = new NodeBase("/default/rack1/node1"); + nt.add(r1n1); + Node r1n2 = new NodeBase("/default/rack1/node2"); + nt.add(r1n2); + + Node r2n3 = new NodeBase("/default/rack2/node3"); + nt.add(r2n3); + + Node r2n4 = new NodeBase("/default/rack2/s1/node4"); + nt.add(r2n4); + + LOG.debug("r1n1 parent: " + r1n1.getParent() + "\n" + + "r1n2 parent: " + r1n2.getParent() + "\n" + + "r2n3 parent: " + r2n3.getParent() + "\n" + + "r2n4 parent: " + r2n4.getParent()); + + // Same host + assertEquals(0, JobInProgress.getMatchingLevelForNodes(r1n1, r1n1, 3)); + // Same rack + assertEquals(1, JobInProgress.getMatchingLevelForNodes(r1n1, r1n2, 3)); + // Different rack + assertEquals(2, JobInProgress.getMatchingLevelForNodes(r1n1, r2n3, 3)); + // Different rack at different depth + assertEquals(3, JobInProgress.getMatchingLevelForNodes(r1n1, r2n4, 3)); + } + + @AfterClass + public static void tearDown() throws Exception { mrCluster.shutdown(); dfsCluster.shutdown(); - super.tearDown(); }