HADOOP-6221 RPC Client operations cannot be interrupted (stevel)

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0111b57e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0111b57e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0111b57e

Branch: refs/heads/HDFS-EC
Commit: 0111b57e19e78f612107952c41bd95cfbf1ce883
Parents: 9feb6b3
Author: Steve Loughran <ste...@apache.org>
Authored: Mon Jan 26 22:04:45 2015 +0000
Committer: Zhe Zhang <z...@apache.org>
Committed: Thu Jan 29 10:05:23 2015 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   2 +
 .../main/java/org/apache/hadoop/ipc/Client.java |   6 +
 .../main/java/org/apache/hadoop/ipc/RPC.java    |   9 +-
 .../apache/hadoop/net/SocketIOWithTimeout.java  |  12 +-
 .../apache/hadoop/ipc/TestRPCWaitForProxy.java  | 130 +++++++++++++++++++
 5 files changed, 152 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0111b57e/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index e0da851..2806ee2 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -763,6 +763,8 @@ Release 2.7.0 - UNRELEASED
     HADOOP-11499. Check of executorThreadsStarted in
     ValueQueue#submitRefillTask() evades lock acquisition (Ted Yu via jlowe)
 
+    HADOOP-6221 RPC Client operations cannot be interrupted. (stevel)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0111b57e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 45a4660..dfde136 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -849,6 +849,12 @@ public class Client {
         throw ioe;
       }
 
+      // Throw the exception if the thread is interrupted
+      if (Thread.currentThread().isInterrupted()) {
+        LOG.warn("Interrupted while trying for connection");
+        throw ioe;
+      }
+
       try {
         Thread.sleep(action.delayMillis);
       } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0111b57e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index 40f6515..8ada0ff 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -412,11 +412,18 @@ public class RPC {
         throw ioe;
       }
 
+      if (Thread.currentThread().isInterrupted()) {
+        // interrupted during some IO; this may not have been caught
+        throw new InterruptedIOException("Interrupted waiting for the proxy");
+      }
+
       // wait for retry
       try {
         Thread.sleep(1000);
       } catch (InterruptedException ie) {
-        // IGNORE
+        Thread.currentThread().interrupt();
+        throw (IOException) new InterruptedIOException(
+            "Interrupted waiting for the proxy").initCause(ioe);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0111b57e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
index ed12b3c..b50f7e9 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
@@ -338,6 +338,12 @@ abstract class SocketIOWithTimeout {
             return ret;
           }
           
+          if (Thread.currentThread().isInterrupted()) {
+            throw new InterruptedIOException("Interrupted while waiting for "
+                + "IO on channel " + channel + ". " + timeout
+                + " millis timeout left.");
+          }
+
           /* Sometimes select() returns 0 much before timeout for 
            * unknown reasons. So select again if required.
            */
@@ -348,12 +354,6 @@ abstract class SocketIOWithTimeout {
             }
           }
           
-          if (Thread.currentThread().isInterrupted()) {
-            throw new InterruptedIOException("Interruped while waiting for " +
-                                             "IO on channel " + channel +
-                                             ". " + timeout + 
-                                             " millis timeout left.");
-          }
         }
       } finally {
         if (key != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0111b57e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
new file mode 100644
index 0000000..5807998
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
+import org.apache.hadoop.ipc.TestRPC.TestProtocol;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InterruptedIOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedByInterruptException;
+
+/**
+ * tests that the proxy can be interrupted
+ */
+public class TestRPCWaitForProxy extends Assert {
+  private static final String ADDRESS = "0.0.0.0";
+  private static final Logger
+      LOG = LoggerFactory.getLogger(TestRPCWaitForProxy.class);
+
+  private static final Configuration conf = new Configuration();
+
+  /**
+   * This tests that the time-bounded wait for a proxy operation works, and
+   * times out.
+   *
+   * @throws Throwable any exception other than that which was expected
+   */
+  @Test(timeout = 10000)
+  public void testWaitForProxy() throws Throwable {
+    RpcThread worker = new RpcThread(0);
+    worker.start();
+    worker.join();
+    Throwable caught = worker.getCaught();
+    assertNotNull("No exception was raised", caught);
+    if (!(caught instanceof ConnectException)) {
+      throw caught;
+    }
+  }
+
+  /**
+   * This test sets off a blocking thread and then interrupts it, before
+   * checking that the thread was interrupted
+   *
+   * @throws Throwable any exception other than that which was expected
+   */
+  @Test(timeout = 10000)
+  public void testInterruptedWaitForProxy() throws Throwable {
+    RpcThread worker = new RpcThread(100);
+    worker.start();
+    Thread.sleep(1000);
+    assertTrue("worker hasn't started", worker.waitStarted);
+    worker.interrupt();
+    worker.join();
+    Throwable caught = worker.getCaught();
+    assertNotNull("No exception was raised", caught);
+    // looking for the root cause here, which can be wrapped
+    // as part of the NetUtils work. Having this test look
+    // a the type of exception there would be brittle to improvements
+    // in exception diagnostics.
+    Throwable cause = caught.getCause();
+    if (cause == null) {
+      // no inner cause, use outer exception as root cause.
+      cause = caught;
+    }
+    if (!(cause instanceof InterruptedIOException)
+        && !(cause instanceof ClosedByInterruptException)) {
+      throw caught;
+    }
+  }
+
+  /**
+   * This thread waits for a proxy for the specified timeout, and retains any
+   * throwable that was raised in the process
+   */
+
+  private class RpcThread extends Thread {
+    private Throwable caught;
+    private int connectRetries;
+    private volatile boolean waitStarted = false;
+
+    private RpcThread(int connectRetries) {
+      this.connectRetries = connectRetries;
+    }
+    @Override
+    public void run() {
+      try {
+        Configuration config = new Configuration(conf);
+        config.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+            connectRetries);
+        config.setInt(
+            IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+            connectRetries);
+        waitStarted = true;
+        TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
+            TestProtocol.versionID,
+            new InetSocketAddress(ADDRESS, 20),
+            config,
+            15000L);
+        proxy.echo("");
+      } catch (Throwable throwable) {
+        caught = throwable;
+      }
+    }
+
+    public Throwable getCaught() {
+      return caught;
+    }
+  }
+}

Reply via email to