This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 5b2d6684e69c153e019845cf21d72a26e70b0160
Author: xuzq <15040255...@163.com>
AuthorDate: Sat Jul 16 05:18:46 2022 +0800

    HADOOP-13144. Enhancing IPC client throughput via multiple connections per 
user (#4542)
---
 .../main/java/org/apache/hadoop/ipc/Client.java    |  8 +-
 .../org/apache/hadoop/ipc/ProtobufRpcEngine.java   | 10 +++
 .../org/apache/hadoop/ipc/ProtobufRpcEngine2.java  | 10 +++
 .../src/main/java/org/apache/hadoop/ipc/RPC.java   | 23 ++++++
 .../main/java/org/apache/hadoop/ipc/RpcEngine.java | 16 ++++
 .../org/apache/hadoop/ipc/WritableRpcEngine.java   | 21 +++++
 .../test/java/org/apache/hadoop/ipc/TestRPC.java   | 58 ++++++++++++++
 .../java/org/apache/hadoop/ipc/TestRpcBase.java    | 89 ++++++++++++++++++++--
 8 files changed, 228 insertions(+), 7 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 4f9a456d399..d21c8073cf1 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -1719,7 +1719,7 @@ public class Client implements AutoCloseable {
     private String saslQop; // here for testing
     private final Configuration conf; // used to get the expected kerberos 
principal name
     
-    ConnectionId(InetSocketAddress address, Class<?> protocol, 
+    public ConnectionId(InetSocketAddress address, Class<?> protocol,
                  UserGroupInformation ticket, int rpcTimeout,
                  RetryPolicy connectionRetryPolicy, Configuration conf) {
       this.protocol = protocol;
@@ -1784,7 +1784,7 @@ public class Client implements AutoCloseable {
       return ticket;
     }
     
-    private int getRpcTimeout() {
+    int getRpcTimeout() {
       return rpcTimeout;
     }
     
@@ -1818,6 +1818,10 @@ public class Client implements AutoCloseable {
     int getPingInterval() {
       return pingInterval;
     }
+
+    RetryPolicy getRetryPolicy() {
+      return connectionRetryPolicy;
+    }
     
     @VisibleForTesting
     String getSaslQop() {
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index d1501b65bb0..dce6631bb1d 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -77,6 +77,16 @@ public class ProtobufRpcEngine implements RpcEngine {
     return ASYNC_RETURN_MESSAGE.get();
   }
 
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+      ConnectionId connId, Configuration conf, SocketFactory factory)
+      throws IOException {
+    final Invoker invoker = new Invoker(protocol, connId, conf, factory);
+    return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
+        protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
+  }
+
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
       InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
       SocketFactory factory, int rpcTimeout) throws IOException {
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java
index 60cce08a3e2..ea2dbba467e 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java
@@ -100,6 +100,16 @@ public class ProtobufRpcEngine2 implements RpcEngine {
       rpcTimeout, connectionRetryPolicy, null, null);
   }
 
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+      ConnectionId connId, Configuration conf, SocketFactory factory)
+      throws IOException {
+    final Invoker invoker = new Invoker(protocol, connId, conf, factory);
+    return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
+        protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index 818305b3169..7f35b13aec9 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -541,6 +541,29 @@ public class RPC {
     return getProtocolProxy(protocol, clientVersion, addr, ticket, conf,
         factory, getRpcTimeout(conf), null);
   }
+
+  /**
+   * Get a protocol proxy that contains a proxy connection to a remote server
+   * and a set of methods that are supported by the server.
+   *
+   * @param <T> Generics Type T
+   * @param protocol protocol class
+   * @param clientVersion client's version
+   * @param connId client connection identifier
+   * @param conf configuration
+   * @param factory socket factory
+   * @return the protocol proxy
+   * @throws IOException if the far end through a RemoteException
+   */
+  public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
+      long clientVersion, ConnectionId connId, Configuration conf,
+      SocketFactory factory) throws IOException {
+    if (UserGroupInformation.isSecurityEnabled()) {
+      SaslRpcServer.init(conf);
+    }
+    return getProtocolEngine(protocol, conf).getProxy(
+        protocol, clientVersion, connId, conf, factory);
+  }
   
   /**
    * Construct a client-side proxy that implements the named protocol,
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
index afc9d035b09..1f0ff2d99d3 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
@@ -57,6 +57,22 @@ public interface RpcEngine {
                   SocketFactory factory, int rpcTimeout,
                   RetryPolicy connectionRetryPolicy) throws IOException;
 
+  /**
+   * Construct a client-side proxy object with a ConnectionId.
+   *
+   * @param <T> Generics Type T.
+   * @param protocol input protocol.
+   * @param clientVersion input clientVersion.
+   * @param connId input ConnectionId.
+   * @param conf input Configuration.
+   * @param factory input factory.
+   * @throws IOException raised on errors performing I/O.
+   * @return ProtocolProxy.
+   */
+  <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+      Client.ConnectionId connId, Configuration conf, SocketFactory factory)
+      throws IOException;
+
   /**
    * Construct a client-side proxy object.
    *
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
index 21181f860d9..3e4ee707d46 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
@@ -306,6 +306,27 @@ public class WritableRpcEngine implements RpcEngine {
       rpcTimeout, connectionRetryPolicy, null, null);
   }
 
+  /**
+   * Construct a client-side proxy object with a ConnectionId.
+   *
+   * @param <T> Generics Type T.
+   * @param protocol input protocol.
+   * @param clientVersion input clientVersion.
+   * @param connId input ConnectionId.
+   * @param conf input Configuration.
+   * @param factory input factory.
+   * @throws IOException raised on errors performing I/O.
+   * @return ProtocolProxy.
+   */
+  @Override
+  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+      Client.ConnectionId connId, Configuration conf, SocketFactory factory)
+      throws IOException {
+    return getProxy(protocol, clientVersion, connId.getAddress(),
+        connId.getTicket(), conf, factory, connId.getRpcTimeout(),
+        connId.getRetryPolicy(), null, null);
+  }
+
   /**
    * Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. 
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index cdfd380e1f3..a184ea173e5 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.ipc;
 
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.retry.RetryUtils;
+import org.apache.hadoop.ipc.metrics.RpcMetrics;
+
 import 
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.thirdparty.protobuf.ServiceException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -288,6 +292,13 @@ public class TestRPC extends TestRpcBase {
           rpcTimeout, connectionRetryPolicy, null, null);
     }
 
+    @Override
+    public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+        ConnectionId connId, Configuration conf, SocketFactory factory)
+        throws IOException {
+      throw new UnsupportedOperationException("This proxy is not supported");
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public <T> ProtocolProxy<T> getProxy(
@@ -389,6 +400,53 @@ public class TestRPC extends TestRpcBase {
     }
   }
 
+  @Test
+  public void testConnectionWithSocketFactory() throws IOException, 
ServiceException {
+    TestRpcService firstProxy = null;
+    TestRpcService secondProxy = null;
+
+    Configuration newConf = new Configuration(conf);
+    newConf.set(CommonConfigurationKeysPublic.
+        HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, "");
+
+    RetryPolicy retryPolicy = RetryUtils.getDefaultRetryPolicy(
+        newConf, "Test.No.Such.Key",
+        true,
+        "Test.No.Such.Key", "10000,6",
+        null);
+
+    // create a server with two handlers
+    Server server = setupTestServer(newConf, 2);
+    try {
+      // create the first client
+      firstProxy = getClient(addr, newConf);
+      // create the second client
+      secondProxy = getClient(addr, newConf);
+
+      firstProxy.ping(null, newEmptyRequest());
+      secondProxy.ping(null, newEmptyRequest());
+
+      Client client = ProtobufRpcEngine2.getClient(newConf);
+      assertEquals(1, client.getConnectionIds().size());
+
+      stop(null, firstProxy, secondProxy);
+      ProtobufRpcEngine2.clearClientCache();
+
+      // create the first client with index 1
+      firstProxy = getMultipleClientWithIndex(addr, newConf, retryPolicy, 1);
+      // create the second client with index 2
+      secondProxy = getMultipleClientWithIndex(addr, newConf, retryPolicy, 2);
+      firstProxy.ping(null, newEmptyRequest());
+      secondProxy.ping(null, newEmptyRequest());
+
+      Client client2 = ProtobufRpcEngine2.getClient(newConf);
+      assertEquals(2, client2.getConnectionIds().size());
+    } finally {
+      System.out.println("Down slow rpc testing");
+      stop(server, firstProxy, secondProxy);
+    }
+  }
+
   @Test
   public void testSlowRpc() throws IOException, ServiceException {
     Server server;
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
index e9019e3d24e..7635b16dac0 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ipc;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.thirdparty.protobuf.BlockingService;
 import org.apache.hadoop.thirdparty.protobuf.RpcController;
 import org.apache.hadoop.thirdparty.protobuf.ServiceException;
@@ -26,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.protobuf.TestProtos;
 import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
 import org.apache.hadoop.net.NetUtils;
@@ -154,11 +157,53 @@ public class TestRpcBase {
     }
   }
 
-  protected static void stop(Server server, TestRpcService proxy) {
-    if (proxy != null) {
-      try {
-        RPC.stopProxy(proxy);
-      } catch (Exception ignored) {}
+  /**
+   * Try to obtain a proxy of TestRpcService with an index.
+   * @param serverAddr input server address
+   * @param clientConf input client configuration
+   * @param retryPolicy input retryPolicy
+   * @param index input index
+   * @return one proxy of TestRpcService
+   */
+  protected static TestRpcService getMultipleClientWithIndex(InetSocketAddress 
serverAddr,
+      Configuration clientConf, RetryPolicy retryPolicy, int index)
+      throws ServiceException, IOException {
+    MockConnectionId connectionId = new MockConnectionId(serverAddr,
+        TestRpcService.class, UserGroupInformation.getCurrentUser(),
+        RPC.getRpcTimeout(clientConf), retryPolicy, clientConf, index);
+    return getClient(connectionId, clientConf);
+  }
+
+  /**
+   * Obtain a TestRpcService Proxy by a connectionId.
+   * @param connId input connectionId
+   * @param clientConf  input configuration
+   * @return a TestRpcService Proxy
+   * @throws ServiceException a ServiceException
+   */
+  protected static TestRpcService getClient(ConnectionId connId,
+      Configuration clientConf) throws ServiceException {
+    try {
+      return RPC.getProtocolProxy(
+          TestRpcService.class,
+          0,
+          connId,
+          clientConf,
+          NetUtils.getDefaultSocketFactory(clientConf)).getProxy();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  protected static void stop(Server server, TestRpcService... proxies) {
+    if (proxies != null) {
+      for (TestRpcService proxy : proxies) {
+        if (proxy != null) {
+          try {
+            RPC.stopProxy(proxy);
+          } catch (Exception ignored) {}
+        }
+      }
     }
 
     if (server != null) {
@@ -189,6 +234,40 @@ public class TestRpcBase {
     return count;
   }
 
+  public static class MockConnectionId extends ConnectionId {
+    private static final int PRIME = 16777619;
+    private final int index;
+
+    public MockConnectionId(InetSocketAddress address, Class<?> protocol,
+        UserGroupInformation ticket, int rpcTimeout, RetryPolicy 
connectionRetryPolicy,
+        Configuration conf, int index) {
+      super(address, protocol, ticket, rpcTimeout, connectionRetryPolicy, 
conf);
+      this.index = index;
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder()
+          .append(PRIME * super.hashCode())
+          .append(this.index)
+          .toHashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!super.equals(obj)) {
+        return false;
+      }
+      if (obj instanceof MockConnectionId) {
+        MockConnectionId other = (MockConnectionId)obj;
+        return new EqualsBuilder()
+            .append(this.index, other.index)
+            .isEquals();
+      }
+      return false;
+    }
+  }
+
   public static class TestTokenIdentifier extends TokenIdentifier {
     private Text tokenid;
     private Text realUser;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to