Repository: hbase
Updated Branches:
  refs/heads/branch-1 e40ed0e83 -> 5362ac031


HBASE-14771 RpcServer#getRemoteAddress always returns null (Abhishek Kumar)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5362ac03
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5362ac03
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5362ac03

Branch: refs/heads/branch-1
Commit: 5362ac03116c9b44f0e82a3d69f194f2fc467e51
Parents: e40ed0e
Author: tedyu <yuzhih...@gmail.com>
Authored: Tue Nov 17 18:49:05 2015 -0800
Committer: tedyu <yuzhih...@gmail.com>
Committed: Tue Nov 17 18:49:05 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |  2 +-
 .../hadoop/hbase/ipc/AbstractTestIPC.java       | 81 ++++++++++++++++++++
 2 files changed, 82 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5362ac03/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 30e5e86..1384653 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -1882,7 +1882,7 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
           ? new TraceInfo(header.getTraceInfo().getTraceId(), 
header.getTraceInfo().getParentId())
           : null;
       Call call = new Call(id, this.service, md, header, param, cellScanner, 
this, responder,
-              totalRequestSize, traceInfo, RpcServer.getRemoteIp());
+              totalRequestSize, traceInfo, this.addr);
       scheduler.dispatch(new CallRunner(RpcServer.this, call));
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5362ac03/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index dffd8e9..5df1edc 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.internal.verification.VerificationModeFactory.times;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
@@ -37,23 +38,28 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
 import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
 import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.util.StringUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.protobuf.BlockingRpcChannel;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
@@ -282,4 +288,79 @@ public abstract class AbstractTestIPC {
       verify(scheduler).stop();
     }
   }
+
+  /**
+   * Instance of RpcServer that echoes client hostAddress back to client
+   */
+  static class TestRpcServer1 extends RpcServer {
+
+    private static BlockingInterface SERVICE1 =
+        new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
+          @Override
+          public EmptyResponseProto ping(RpcController unused, 
EmptyRequestProto request)
+              throws ServiceException {
+            return EmptyResponseProto.newBuilder().build();
+          }
+
+          @Override
+          public EchoResponseProto echo(RpcController unused, EchoRequestProto 
request)
+              throws ServiceException {
+            final InetAddress remoteAddr = TestRpcServer1.getRemoteAddress();
+            final String message = remoteAddr == null ? "NULL" : 
remoteAddr.getHostAddress();
+            return EchoResponseProto.newBuilder().setMessage(message).build();
+          }
+
+          @Override
+          public EmptyResponseProto error(RpcController unused, 
EmptyRequestProto request)
+              throws ServiceException {
+            throw new ServiceException("error", new IOException("error"));
+          }
+        };
+
+    TestRpcServer1() throws IOException {
+      this(new FifoRpcScheduler(CONF, 1));
+    }
+
+    TestRpcServer1(RpcScheduler scheduler) throws IOException {
+      super(null, "testRemoteAddressInCallObject", Lists
+          .newArrayList(new 
BlockingServiceAndInterface(TestRpcServiceProtos.TestProtobufRpcProto
+              .newReflectiveBlockingService(SERVICE1), null)),
+          new InetSocketAddress("localhost", 0), CONF, scheduler);
+    }
+  }
+
+  /**
+   * Tests that the RpcServer creates & dispatches CallRunner object to 
scheduler with non-null
+   * remoteAddress set to its Call Object
+   * @throws ServiceException
+   */
+  @Test
+  public void testRpcServerForNotNullRemoteAddressInCallObject() throws 
IOException,
+      ServiceException {
+    final RpcScheduler scheduler = new FifoRpcScheduler(CONF, 1);
+    final TestRpcServer1 rpcServer = new TestRpcServer1(scheduler);
+    final InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
+    final AbstractRpcClient client =
+        new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT, localAddr, 
null);
+    try {
+      rpcServer.start();
+      final InetSocketAddress isa = rpcServer.getListenerAddress();
+      if (isa == null) {
+        throw new IOException("Listener channel is closed");
+      }
+      final BlockingRpcChannel channel =
+          client.createBlockingRpcChannel(
+            ServerName.valueOf(isa.getHostName(), isa.getPort(), 
System.currentTimeMillis()),
+            User.getCurrent(), 0);
+      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
+          TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
+      final EchoRequestProto echoRequest =
+          EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build();
+      final EchoResponseProto echoResponse = stub.echo(null, echoRequest);
+      Assert.assertEquals(localAddr.getAddress().getHostAddress(), 
echoResponse.getMessage());
+    } finally {
+      client.close();
+      rpcServer.stop();
+    }
+  }
 }

Reply via email to