Repository: hadoop Updated Branches: refs/heads/branch-2.8 7ddff4b37 -> 69b195d61
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69b195d6/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto index 722af89..abb3883 100644 --- a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto +++ b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto @@ -32,6 +32,13 @@ service TestProtobufRpcProto { rpc echo(EchoRequestProto) returns (EchoResponseProto); rpc error(EmptyRequestProto) returns (EmptyResponseProto); rpc error2(EmptyRequestProto) returns (EmptyResponseProto); + rpc slowPing(SlowPingRequestProto) returns (EmptyResponseProto); + rpc echo2(EchoRequestProto2) returns (EchoResponseProto2); + rpc add(AddRequestProto) returns (AddResponseProto); + rpc add2(AddRequestProto2) returns (AddResponseProto); + rpc testServerGet(EmptyRequestProto) returns (EmptyResponseProto); + rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto); + rpc sleep(SleepRequestProto) returns (EmptyResponseProto); } service TestProtobufRpc2Proto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/69b195d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java index 39e6dc5..44ed883 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java @@ -24,15 +24,25 @@ import static org.mockito.Mockito.when; import java.net.InetAddress; import java.net.InetSocketAddress; +import com.google.protobuf.BlockingService; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ClientId; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.TestRPC.TestImpl; +import org.apache.hadoop.ipc.TestRpcBase.TestRpcService; import org.apache.hadoop.ipc.TestRPC.TestProtocol; +import org.apache.hadoop.ipc.TestRpcBase; +import org.apache.hadoop.ipc.protobuf.TestProtos; +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.Keys; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -188,12 +198,19 @@ public class TestNMAuditLogger { * A special extension of {@link TestImpl} RPC server with * {@link TestImpl#ping()} testing the audit logs. */ - private class MyTestRPCServer extends TestImpl { + private class MyTestRPCServer extends TestRpcBase.PBServerImpl { @Override - public void ping() { + public TestProtos.EmptyResponseProto ping( + RpcController unused, TestProtos.EmptyRequestProto request) + throws ServiceException { + // Ensure clientId is received + byte[] clientId = Server.getClientId(); + Assert.assertNotNull(clientId); + Assert.assertEquals(ClientId.BYTE_LENGTH, clientId.length); // test with ip set testSuccessLogFormat(true); testFailureLogFormat(true); + return TestProtos.EmptyResponseProto.newBuilder().build(); } } @@ -203,9 +220,17 @@ public class TestNMAuditLogger { @Test public void testNMAuditLoggerWithIP() throws Exception { Configuration conf = new Configuration(); + RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class); + + // Create server side implementation + MyTestRPCServer serverImpl = new MyTestRPCServer(); + BlockingService service = TestRpcServiceProtos.TestProtobufRpcProto + .newReflectiveBlockingService(serverImpl); + // start the IPC server - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new MyTestRPCServer()).setBindAddress("0.0.0.0") + Server server = new RPC.Builder(conf) + .setProtocol(TestRpcBase.TestRpcService.class) + .setInstance(service).setBindAddress("0.0.0.0") .setPort(0).setNumHandlers(5).setVerbose(true).build(); server.start(); @@ -213,11 +238,14 @@ public class TestNMAuditLogger { InetSocketAddress addr = NetUtils.getConnectAddress(server); // Make a client connection and test the audit log - TestProtocol proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, + TestRpcService proxy = RPC.getProxy(TestRpcService.class, TestProtocol.versionID, addr, conf); // Start the testcase - proxy.ping(); + TestProtos.EmptyRequestProto pingRequest = + TestProtos.EmptyRequestProto.newBuilder().build(); + proxy.ping(null, pingRequest); server.stop(); + RPC.stopProxy(proxy); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/69b195d6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java index 49b23d9..66af3f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java @@ -24,17 +24,27 @@ import static org.mockito.Mockito.when; import java.net.InetAddress; import java.net.InetSocketAddress; +import com.google.protobuf.BlockingService; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.CallerContext; +import org.apache.hadoop.ipc.ClientId; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.TestRPC.TestImpl; import org.apache.hadoop.ipc.TestRPC.TestProtocol; +import org.apache.hadoop.ipc.TestRpcBase; +import org.apache.hadoop.ipc.TestRpcBase.TestRpcService; +import org.apache.hadoop.ipc.protobuf.TestProtos; +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.Keys; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -255,12 +265,19 @@ public class TestRMAuditLogger { * A special extension of {@link TestImpl} RPC server with * {@link TestImpl#ping()} testing the audit logs. */ - private class MyTestRPCServer extends TestImpl { + private class MyTestRPCServer extends TestRpcBase.PBServerImpl { @Override - public void ping() { + public TestProtos.EmptyResponseProto ping( + RpcController unused, TestProtos.EmptyRequestProto request) + throws ServiceException { + // Ensure clientId is received + byte[] clientId = Server.getClientId(); + Assert.assertNotNull(clientId); + Assert.assertEquals(ClientId.BYTE_LENGTH, clientId.length); // test with ip set testSuccessLogFormat(true); testFailureLogFormat(true); + return TestProtos.EmptyResponseProto.newBuilder().build(); } } @@ -270,20 +287,33 @@ public class TestRMAuditLogger { @Test public void testRMAuditLoggerWithIP() throws Exception { Configuration conf = new Configuration(); + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); + + // Create server side implementation + MyTestRPCServer serverImpl = new MyTestRPCServer(); + BlockingService service = TestRpcServiceProtos.TestProtobufRpcProto + .newReflectiveBlockingService(serverImpl); + // start the IPC server - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new MyTestRPCServer()).setBindAddress("0.0.0.0") + Server server = new RPC.Builder(conf) + .setProtocol(TestRpcService.class) + .setInstance(service).setBindAddress("0.0.0.0") .setPort(0).setNumHandlers(5).setVerbose(true).build(); + server.start(); InetSocketAddress addr = NetUtils.getConnectAddress(server); // Make a client connection and test the audit log - TestProtocol proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, addr, conf); + TestRpcService proxy = RPC.getProxy(TestRpcService.class, + TestProtocol.versionID, addr, conf); // Start the testcase - proxy.ping(); + TestProtos.EmptyRequestProto pingRequest = + TestProtos.EmptyRequestProto.newBuilder().build(); + proxy.ping(null, pingRequest); server.stop(); + RPC.stopProxy(proxy); } }