HADOOP-13546. Override equals and hashCode of the default retry policy to avoid 
connection leakage. Contributed by Xiaobing Zhou.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/08d8e0ba
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/08d8e0ba
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/08d8e0ba

Branch: refs/heads/HADOOP-12756
Commit: 08d8e0ba259f01465a83d8db09466dfd46b7ec81
Parents: db6d243
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Sep 13 11:12:52 2016 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Sep 13 11:12:52 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/io/retry/RetryPolicies.java   |  14 ++
 .../org/apache/hadoop/io/retry/RetryUtils.java  | 118 ++++++++-----
 .../io/retry/TestConnectionRetryPolicy.java     | 154 +++++++++++++++++
 .../hadoop/ipc/TestReuseRpcConnections.java     | 166 +++++++++++++++++++
 .../java/org/apache/hadoop/ipc/TestRpcBase.java |  23 ++-
 5 files changed, 431 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/08d8e0ba/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
index c0a14b7..0c523a5 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
@@ -183,6 +183,20 @@ public class RetryPolicies {
       return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "try once " +
           "and fail.");
     }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      } else {
+        return obj != null && obj.getClass() == this.getClass();
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      return this.getClass().hashCode();
+    }
   }
 
   static class RetryForever implements RetryPolicy {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08d8e0ba/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java
index a5a7624..15a9b54 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicies.MultipleLinearRandomRetry;
 import org.apache.hadoop.ipc.RemoteException;
 
 import com.google.protobuf.ServiceException;
@@ -79,48 +80,85 @@ public class RetryUtils {
       //no retry
       return RetryPolicies.TRY_ONCE_THEN_FAIL;
     } else {
-      return new RetryPolicy() {
-        @Override
-        public RetryAction shouldRetry(Exception e, int retries, int failovers,
-            boolean isMethodIdempotent) throws Exception {
-          if (e instanceof ServiceException) {
-            //unwrap ServiceException
-            final Throwable cause = e.getCause();
-            if (cause != null && cause instanceof Exception) {
-              e = (Exception)cause;
-            }
-          }
-
-          //see (1) and (2) in the javadoc of this method.
-          final RetryPolicy p;
-          if (e instanceof RetriableException
-              || RetryPolicies.getWrappedRetriableException(e) != null) {
-            // RetriableException or RetriableException wrapped
-            p = multipleLinearRandomRetry;
-          } else if (e instanceof RemoteException) {
-            final RemoteException re = (RemoteException)e;
-            p = remoteExceptionToRetry.equals(re.getClassName())?
-                multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
-          } else if (e instanceof IOException || e instanceof 
ServiceException) {
-            p = multipleLinearRandomRetry;
-          } else { //non-IOException
-            p = RetryPolicies.TRY_ONCE_THEN_FAIL;
-          }
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("RETRY " + retries + ") policy="
-                + p.getClass().getSimpleName() + ", exception=" + e);
-          }
-          return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
-        }
+      return new WrapperRetryPolicy(
+          (MultipleLinearRandomRetry) multipleLinearRandomRetry,
+          remoteExceptionToRetry);
+    }
+  }
+
+  private static final class WrapperRetryPolicy implements RetryPolicy {
+    private MultipleLinearRandomRetry multipleLinearRandomRetry;
+    private String remoteExceptionToRetry;
 
-        @Override
-        public String toString() {
-          return "RetryPolicy[" + multipleLinearRandomRetry + ", "
-              + RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName()
-              + "]";
+    private WrapperRetryPolicy(
+        final MultipleLinearRandomRetry multipleLinearRandomRetry,
+        final String remoteExceptionToRetry) {
+      this.multipleLinearRandomRetry = multipleLinearRandomRetry;
+      this.remoteExceptionToRetry = remoteExceptionToRetry;
+    }
+
+    @Override
+    public RetryAction shouldRetry(Exception e, int retries, int failovers,
+        boolean isMethodIdempotent) throws Exception {
+      if (e instanceof ServiceException) {
+        //unwrap ServiceException
+        final Throwable cause = e.getCause();
+        if (cause != null && cause instanceof Exception) {
+          e = (Exception)cause;
         }
-      };
+      }
+
+      //see (1) and (2) in the javadoc of this method.
+      final RetryPolicy p;
+      if (e instanceof RetriableException
+          || RetryPolicies.getWrappedRetriableException(e) != null) {
+        // RetriableException or RetriableException wrapped
+        p = multipleLinearRandomRetry;
+      } else if (e instanceof RemoteException) {
+        final RemoteException re = (RemoteException)e;
+        p = re.getClassName().equals(remoteExceptionToRetry)
+            ? multipleLinearRandomRetry : RetryPolicies.TRY_ONCE_THEN_FAIL;
+      } else if (e instanceof IOException || e instanceof ServiceException) {
+        p = multipleLinearRandomRetry;
+      } else { //non-IOException
+        p = RetryPolicies.TRY_ONCE_THEN_FAIL;
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("RETRY " + retries + ") policy="
+            + p.getClass().getSimpleName() + ", exception=" + e);
+      }
+      return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
+    }
+
+    /**
+     * remoteExceptionToRetry is ignored as part of equals since it does not
+     * affect connection failure handling.
+     */
+    @Override
+    public boolean equals(final Object obj) {
+      if (obj == this) {
+        return true;
+      } else {
+        return (obj instanceof WrapperRetryPolicy)
+            && this.multipleLinearRandomRetry
+                .equals(((WrapperRetryPolicy) obj).multipleLinearRandomRetry);
+      }
+    }
+
+    /**
+     * Similarly, remoteExceptionToRetry is ignored as part of hashCode since 
it
+     * does not affect connection failure handling.
+     */
+    @Override
+    public int hashCode() {
+      return multipleLinearRandomRetry.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return "RetryPolicy[" + multipleLinearRandomRetry + ", "
+          + RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName() + "]";
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08d8e0ba/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestConnectionRetryPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestConnectionRetryPolicy.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestConnectionRetryPolicy.java
new file mode 100644
index 0000000..05a309d
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestConnectionRetryPolicy.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.retry;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.ipc.RpcNoSuchMethodException;
+import org.junit.Test;
+
+/**
+ * This class mainly tests behaviors of various retry policies in connection
+ * level.
+ */
+public class TestConnectionRetryPolicy {
+  private static RetryPolicy getDefaultRetryPolicy(
+      final boolean defaultRetryPolicyEnabled,
+      final String defaultRetryPolicySpec,
+      final String remoteExceptionToRetry) {
+    return getDefaultRetryPolicy(
+        new Configuration(),
+        defaultRetryPolicyEnabled,
+        defaultRetryPolicySpec,
+        remoteExceptionToRetry);
+  }
+
+  private static RetryPolicy getDefaultRetryPolicy(
+      final boolean defaultRetryPolicyEnabled,
+      final String defaultRetryPolicySpec) {
+    return getDefaultRetryPolicy(
+        new Configuration(),
+        defaultRetryPolicyEnabled,
+        defaultRetryPolicySpec,
+        "");
+  }
+
+  public static RetryPolicy getDefaultRetryPolicy(
+      final Configuration conf,
+      final boolean defaultRetryPolicyEnabled,
+      final String defaultRetryPolicySpec,
+      final String remoteExceptionToRetry) {
+    return RetryUtils.getDefaultRetryPolicy(
+        conf,
+        "org.apache.hadoop.io.retry.TestConnectionRetryPolicy.No.Such.Key",
+        defaultRetryPolicyEnabled,
+        "org.apache.hadoop.io.retry.TestConnectionRetryPolicy.No.Such.Key",
+        defaultRetryPolicySpec,
+        "");
+  }
+
+  @Test(timeout = 60000)
+  public void testDefaultRetryPolicyEquivalence() {
+    RetryPolicy rp1 = null;
+    RetryPolicy rp2 = null;
+    RetryPolicy rp3 = null;
+
+    /* test the same setting */
+    rp1 = getDefaultRetryPolicy(true, "10000,2");
+    rp2 = getDefaultRetryPolicy(true, "10000,2");
+    rp3 = getDefaultRetryPolicy(true, "10000,2");
+    verifyRetryPolicyEquivalence(new RetryPolicy[] {rp1, rp2, rp3});
+
+    /* test different remoteExceptionToRetry */
+    rp1 = getDefaultRetryPolicy(
+        true,
+        "10000,2",
+        new RemoteException(
+            PathIOException.class.getName(),
+            "path IO exception").getClassName());
+    rp2 = getDefaultRetryPolicy(
+        true,
+        "10000,2",
+        new RemoteException(
+            RpcNoSuchMethodException.class.getName(),
+            "no such method exception").getClassName());
+    rp3 = getDefaultRetryPolicy(
+        true,
+        "10000,2",
+        new RemoteException(
+            RetriableException.class.getName(),
+            "retriable exception").getClassName());
+    verifyRetryPolicyEquivalence(new RetryPolicy[] {rp1, rp2, rp3});
+
+    /* test enabled and different specifications */
+    rp1 = getDefaultRetryPolicy(true, "20000,3");
+    rp2 = getDefaultRetryPolicy(true, "30000,4");
+    assertNotEquals("should not be equal", rp1, rp2);
+    assertNotEquals(
+        "should not have the same hash code",
+        rp1.hashCode(),
+        rp2.hashCode());
+
+    /* test disabled and the same specifications */
+    rp1 = getDefaultRetryPolicy(false, "40000,5");
+    rp2 = getDefaultRetryPolicy(false, "40000,5");
+    assertEquals("should be equal", rp1, rp2);
+    assertEquals(
+        "should have the same hash code",
+        rp1, rp2);
+
+    /* test the disabled and different specifications */
+    rp1 = getDefaultRetryPolicy(false, "50000,6");
+    rp2 = getDefaultRetryPolicy(false, "60000,7");
+    assertEquals("should be equal", rp1, rp2);
+    assertEquals(
+        "should have the same hash code",
+        rp1, rp2);
+  }
+
+  public static RetryPolicy newTryOnceThenFail() {
+    return new RetryPolicies.TryOnceThenFail();
+  }
+
+  @Test(timeout = 60000)
+  public void testTryOnceThenFailEquivalence() throws Exception {
+    final RetryPolicy rp1 = newTryOnceThenFail();
+    final RetryPolicy rp2 = newTryOnceThenFail();
+    final RetryPolicy rp3 = newTryOnceThenFail();
+    verifyRetryPolicyEquivalence(new RetryPolicy[] {rp1, rp2, rp3});
+  }
+
+  private void verifyRetryPolicyEquivalence(RetryPolicy[] polices) {
+    for (int i = 0; i < polices.length; i++) {
+      for (int j = 0; j < polices.length; j++) {
+        if (i != j) {
+          assertEquals("should be equal", polices[i], polices[j]);
+          assertEquals(
+              "should have the same hash code",
+              polices[i].hashCode(),
+              polices[j].hashCode());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08d8e0ba/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestReuseRpcConnections.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestReuseRpcConnections.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestReuseRpcConnections.java
new file mode 100644
index 0000000..2729dc3
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestReuseRpcConnections.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+import static org.junit.Assert.*;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.TestConnectionRetryPolicy;
+import org.apache.hadoop.ipc.Client.ConnectionId;
+import org.apache.hadoop.ipc.TestRpcBase.TestRpcService;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This class mainly tests behaviors of reusing RPC connections for various
+ * retry policies.
+ */
+public class TestReuseRpcConnections extends TestRpcBase {
+  @Before
+  public void setup() {
+    setupConf();
+  }
+
+  private static RetryPolicy getDefaultRetryPolicy(
+      final boolean defaultRetryPolicyEnabled,
+      final String defaultRetryPolicySpec) {
+    return TestConnectionRetryPolicy.getDefaultRetryPolicy(
+        conf,
+        defaultRetryPolicyEnabled,
+        defaultRetryPolicySpec,
+        "");
+  }
+
+  private static RetryPolicy getDefaultRetryPolicy(
+      final boolean defaultRetryPolicyEnabled,
+      final String defaultRetryPolicySpec,
+      final String remoteExceptionToRetry) {
+    return TestConnectionRetryPolicy.getDefaultRetryPolicy(
+        conf,
+        defaultRetryPolicyEnabled,
+        defaultRetryPolicySpec,
+        remoteExceptionToRetry);
+  }
+
+  @Test(timeout = 60000)
+  public void testDefaultRetryPolicyReuseConnections() throws Exception {
+    RetryPolicy rp1 = null;
+    RetryPolicy rp2 = null;
+    RetryPolicy rp3 = null;
+
+    /* test the same setting */
+    rp1 = getDefaultRetryPolicy(true, "10000,2");
+    rp2 = getDefaultRetryPolicy(true, "10000,2");
+    verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER);
+
+    /* test enabled and different specifications */
+    rp1 = getDefaultRetryPolicy(true, "20000,3");
+    rp2 = getDefaultRetryPolicy(true, "20000,3");
+    rp3 = getDefaultRetryPolicy(true, "30000,4");
+    verifyRetryPolicyReuseConnections(rp1, rp2, rp3);
+
+    /* test disabled and the same specifications */
+    rp1 = getDefaultRetryPolicy(false, "40000,5");
+    rp2 = getDefaultRetryPolicy(false, "40000,5");
+    verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER);
+
+    /* test disabled and different specifications */
+    rp1 = getDefaultRetryPolicy(false, "50000,6");
+    rp2 = getDefaultRetryPolicy(false, "60000,7");
+    verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER);
+
+    /* test different remoteExceptionToRetry */
+    rp1 = getDefaultRetryPolicy(
+        true,
+        "70000,8",
+        new RemoteException(
+            RpcNoSuchMethodException.class.getName(),
+            "no such method exception").getClassName());
+    rp2 = getDefaultRetryPolicy(
+        true,
+        "70000,8",
+        new RemoteException(
+            PathIOException.class.getName(),
+            "path IO exception").getClassName());
+    verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER);
+  }
+
+  @Test(timeout = 60000)
+  public void testRetryPolicyTryOnceThenFail() throws Exception {
+    final RetryPolicy rp1 = TestConnectionRetryPolicy.newTryOnceThenFail();
+    final RetryPolicy rp2 = TestConnectionRetryPolicy.newTryOnceThenFail();
+    verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER);
+  }
+
+  private void verifyRetryPolicyReuseConnections(
+      final RetryPolicy retryPolicy1,
+      final RetryPolicy retryPolicy2,
+      final RetryPolicy anotherRetryPolicy) throws Exception {
+    final Server server = setupTestServer(conf, 2);
+    final Configuration newConf = new Configuration(conf);
+    newConf.set(
+        CommonConfigurationKeysPublic
+          .HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY,
+        "");
+    Client client = null;
+    TestRpcService proxy1 = null;
+    TestRpcService proxy2 = null;
+    TestRpcService proxy3 = null;
+
+    try {
+      proxy1 = getClient(addr, newConf, retryPolicy1);
+      proxy1.ping(null, newEmptyRequest());
+      client = ProtobufRpcEngine.getClient(newConf);
+      final Set<ConnectionId> conns = client.getConnectionIds();
+      assertEquals("number of connections in cache is wrong", 1, conns.size());
+
+      /*
+       * another equivalent retry policy, reuse connection
+       */
+      proxy2 = getClient(addr, newConf, retryPolicy2);
+      proxy2.ping(null, newEmptyRequest());
+      assertEquals("number of connections in cache is wrong", 1, conns.size());
+
+      /*
+       * different retry policy, create a new connection
+       */
+      proxy3 = getClient(addr, newConf, anotherRetryPolicy);
+      proxy3.ping(null, newEmptyRequest());
+      assertEquals("number of connections in cache is wrong", 2, conns.size());
+    } finally {
+      server.stop();
+      // this is dirty, but clear out connection cache for next run
+      if (client != null) {
+        client.getConnectionIds().clear();
+      }
+      if (proxy1 != null) {
+        RPC.stopProxy(proxy1);
+      }
+      if (proxy2 != null) {
+        RPC.stopProxy(proxy2);
+      }
+      if (proxy3 != null) {
+        RPC.stopProxy(proxy3);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08d8e0ba/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
index bc604a4..e991405 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
@@ -30,18 +30,15 @@ import org.apache.hadoop.security.token.SecretManager;
 import org.junit.Assert;
 
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.protobuf.TestProtos;
 import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenInfo;
 import org.apache.hadoop.security.token.TokenSelector;
-import org.junit.Assert;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -132,6 +129,24 @@ public class TestRpcBase {
     }
   }
 
+  protected static TestRpcService getClient(InetSocketAddress serverAddr,
+      Configuration clientConf, final RetryPolicy connectionRetryPolicy)
+      throws ServiceException {
+    try {
+      return RPC.getProtocolProxy(
+          TestRpcService.class,
+          0,
+          serverAddr,
+          UserGroupInformation.getCurrentUser(),
+          clientConf,
+          NetUtils.getDefaultSocketFactory(clientConf),
+          RPC.getRpcTimeout(clientConf),
+          connectionRetryPolicy, null).getProxy();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
   protected static void stop(Server server, TestRpcService proxy) {
     if (proxy != null) {
       try {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to