Author: tgraves Date: Wed Apr 11 21:14:55 2012 New Revision: 1325011 URL: http://svn.apache.org/viewvc?rev=1325011&view=rev Log: merge -r 1325009:1325010 from trunk. FIXES: MAPREDUCE-4107
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1325011&r1=1325010&r2=1325011&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Apr 11 21:14:55 2012 @@ -134,6 +134,9 @@ Release 2.0.0 - UNRELEASED MAPREDUCE-4108. Fix tests in org.apache.hadoop.util.TestRunJar (Devaraj K via tgraves) + MAPREDUCE-4107. Fix tests in org.apache.hadoop.ipc.TestSocketFactory + (Devaraj K via tgraves) + Release 0.23.3 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java?rev=1325011&r1=1325010&r2=1325011&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java Wed Apr 11 21:14:55 2012 @@ -22,124 +22,149 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; -import junit.framework.TestCase; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobStatus; -import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; import org.apache.hadoop.net.StandardSocketFactory; -import org.junit.Ignore; +import org.junit.Assert; +import org.junit.Test; /** * This class checks that RPCs can use specialized socket factories. */ -@Ignore -public class TestSocketFactory extends TestCase { +public class TestSocketFactory { /** - * Check that we can reach a NameNode or a JobTracker using a specific + * Check that we can reach a NameNode or Resource Manager using a specific * socket factory */ + @Test public void testSocketFactory() throws IOException { // Create a standard mini-cluster Configuration sconf = new Configuration(); - MiniDFSCluster cluster = new MiniDFSCluster(sconf, 1, true, null); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(sconf).numDataNodes(1) + .build(); final int nameNodePort = cluster.getNameNodePort(); // Get a reference to its DFS directly FileSystem fs = cluster.getFileSystem(); - assertTrue(fs instanceof DistributedFileSystem); + Assert.assertTrue(fs instanceof DistributedFileSystem); DistributedFileSystem directDfs = (DistributedFileSystem) fs; - // Get another reference via network using a specific socket factory - Configuration cconf = new Configuration(); - FileSystem.setDefaultUri(cconf, String.format("hdfs://localhost:%s/", - nameNodePort + 10)); - cconf.set("hadoop.rpc.socket.factory.class.default", - "org.apache.hadoop.ipc.DummySocketFactory"); - cconf.set("hadoop.rpc.socket.factory.class.ClientProtocol", - "org.apache.hadoop.ipc.DummySocketFactory"); - cconf.set("hadoop.rpc.socket.factory.class.JobSubmissionProtocol", - "org.apache.hadoop.ipc.DummySocketFactory"); + Configuration cconf = getCustomSocketConfigs(nameNodePort); fs = FileSystem.get(cconf); - assertTrue(fs instanceof DistributedFileSystem); + Assert.assertTrue(fs instanceof DistributedFileSystem); DistributedFileSystem dfs = (DistributedFileSystem) fs; JobClient client = null; - MiniMRCluster mr = null; + MiniMRYarnCluster miniMRYarnCluster = null; try { // This will test RPC to the NameNode only. // could we test Client-DataNode connections? Path filePath = new Path("/dir"); - assertFalse(directDfs.exists(filePath)); - assertFalse(dfs.exists(filePath)); + Assert.assertFalse(directDfs.exists(filePath)); + Assert.assertFalse(dfs.exists(filePath)); directDfs.mkdirs(filePath); - assertTrue(directDfs.exists(filePath)); - assertTrue(dfs.exists(filePath)); + Assert.assertTrue(directDfs.exists(filePath)); + Assert.assertTrue(dfs.exists(filePath)); - // This will test TPC to a JobTracker + // This will test RPC to a Resource Manager fs = FileSystem.get(sconf); - mr = new MiniMRCluster(1, fs.getUri().toString(), 1); - final int jobTrackerPort = mr.getJobTrackerPort(); - + JobConf jobConf = new JobConf(); + FileSystem.setDefaultUri(jobConf, fs.getUri().toString()); + miniMRYarnCluster = initAndStartMiniMRYarnCluster(jobConf); JobConf jconf = new JobConf(cconf); - jconf.set("mapred.job.tracker", String.format("localhost:%d", - jobTrackerPort + 10)); - jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME); + jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); + String rmAddress = jconf.get("yarn.resourcemanager.address"); + String[] split = rmAddress.split(":"); + jconf.set("yarn.resourcemanager.address", split[0] + ':' + + (Integer.parseInt(split[1]) + 10)); client = new JobClient(jconf); JobStatus[] jobs = client.jobsToComplete(); - assertTrue(jobs.length == 0); + Assert.assertTrue(jobs.length == 0); } finally { - try { - if (client != null) - client.close(); - } catch (Exception ignored) { - // nothing we can do - ignored.printStackTrace(); - } - try { - if (dfs != null) - dfs.close(); - - } catch (Exception ignored) { - // nothing we can do - ignored.printStackTrace(); - } - try { - if (directDfs != null) - directDfs.close(); - - } catch (Exception ignored) { - // nothing we can do - ignored.printStackTrace(); - } - try { - if (cluster != null) - cluster.shutdown(); - - } catch (Exception ignored) { - // nothing we can do - ignored.printStackTrace(); - } - if (mr != null) { - try { - mr.shutdown(); - } catch (Exception ignored) { - ignored.printStackTrace(); - } - } + closeClient(client); + closeDfs(dfs); + closeDfs(directDfs); + stopMiniMRYarnCluster(miniMRYarnCluster); + shutdownDFSCluster(cluster); + } + } + + private MiniMRYarnCluster initAndStartMiniMRYarnCluster(JobConf jobConf) { + MiniMRYarnCluster miniMRYarnCluster; + miniMRYarnCluster = new MiniMRYarnCluster(this.getClass().getName(), 1); + miniMRYarnCluster.init(jobConf); + miniMRYarnCluster.start(); + return miniMRYarnCluster; + } + + private Configuration getCustomSocketConfigs(final int nameNodePort) { + // Get another reference via network using a specific socket factory + Configuration cconf = new Configuration(); + FileSystem.setDefaultUri(cconf, String.format("hdfs://localhost:%s/", + nameNodePort + 10)); + cconf.set("hadoop.rpc.socket.factory.class.default", + "org.apache.hadoop.ipc.DummySocketFactory"); + cconf.set("hadoop.rpc.socket.factory.class.ClientProtocol", + "org.apache.hadoop.ipc.DummySocketFactory"); + cconf.set("hadoop.rpc.socket.factory.class.JobSubmissionProtocol", + "org.apache.hadoop.ipc.DummySocketFactory"); + return cconf; + } + + private void shutdownDFSCluster(MiniDFSCluster cluster) { + try { + if (cluster != null) + cluster.shutdown(); + + } catch (Exception ignored) { + // nothing we can do + ignored.printStackTrace(); + } + } + + private void stopMiniMRYarnCluster(MiniMRYarnCluster miniMRYarnCluster) { + try { + if (miniMRYarnCluster != null) + miniMRYarnCluster.stop(); + + } catch (Exception ignored) { + // nothing we can do + ignored.printStackTrace(); + } + } + + private void closeDfs(DistributedFileSystem dfs) { + try { + if (dfs != null) + dfs.close(); + + } catch (Exception ignored) { + // nothing we can do + ignored.printStackTrace(); + } + } + + private void closeClient(JobClient client) { + try { + if (client != null) + client.close(); + } catch (Exception ignored) { + // nothing we can do + ignored.printStackTrace(); } } } @@ -155,32 +180,27 @@ class DummySocketFactory extends Standar public DummySocketFactory() { } - /* @inheritDoc */ @Override public Socket createSocket() throws IOException { return new Socket() { @Override - public void connect(SocketAddress addr, int timeout) - throws IOException { + public void connect(SocketAddress addr, int timeout) throws IOException { assert (addr instanceof InetSocketAddress); InetSocketAddress iaddr = (InetSocketAddress) addr; SocketAddress newAddr = null; if (iaddr.isUnresolved()) - newAddr = - new InetSocketAddress(iaddr.getHostName(), - iaddr.getPort() - 10); + newAddr = new InetSocketAddress(iaddr.getHostName(), + iaddr.getPort() - 10); else - newAddr = - new InetSocketAddress(iaddr.getAddress(), iaddr.getPort() - 10); - System.out.printf("Test socket: rerouting %s to %s\n", iaddr, - newAddr); + newAddr = new InetSocketAddress(iaddr.getAddress(), + iaddr.getPort() - 10); + System.out.printf("Test socket: rerouting %s to %s\n", iaddr, newAddr); super.connect(newAddr, timeout); } }; } - /* @inheritDoc */ @Override public boolean equals(Object obj) { if (this == obj) @@ -191,11 +211,4 @@ class DummySocketFactory extends Standar return false; return true; } - - /* @inheritDoc */ - @Override - public int hashCode() { - // Dummy hash code (to make find bugs happy) - return 53; - } }