HADOOP-6221 RPC Client operations cannot be interrupted (stevel)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0111b57e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0111b57e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0111b57e Branch: refs/heads/HDFS-EC Commit: 0111b57e19e78f612107952c41bd95cfbf1ce883 Parents: 9feb6b3 Author: Steve Loughran <ste...@apache.org> Authored: Mon Jan 26 22:04:45 2015 +0000 Committer: Zhe Zhang <z...@apache.org> Committed: Thu Jan 29 10:05:23 2015 -0800 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 2 + .../main/java/org/apache/hadoop/ipc/Client.java | 6 + .../main/java/org/apache/hadoop/ipc/RPC.java | 9 +- .../apache/hadoop/net/SocketIOWithTimeout.java | 12 +- .../apache/hadoop/ipc/TestRPCWaitForProxy.java | 130 +++++++++++++++++++ 5 files changed, 152 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0111b57e/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index e0da851..2806ee2 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -763,6 +763,8 @@ Release 2.7.0 - UNRELEASED HADOOP-11499. Check of executorThreadsStarted in ValueQueue#submitRefillTask() evades lock acquisition (Ted Yu via jlowe) + HADOOP-6221 RPC Client operations cannot be interrupted. (stevel) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/0111b57e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 45a4660..dfde136 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -849,6 +849,12 @@ public class Client { throw ioe; } + // Throw the exception if the thread is interrupted + if (Thread.currentThread().isInterrupted()) { + LOG.warn("Interrupted while trying for connection"); + throw ioe; + } + try { Thread.sleep(action.delayMillis); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0111b57e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 40f6515..8ada0ff 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -412,11 +412,18 @@ public class RPC { throw ioe; } + if (Thread.currentThread().isInterrupted()) { + // interrupted during some IO; this may not have been caught + throw new InterruptedIOException("Interrupted waiting for the proxy"); + } + // wait for retry try { Thread.sleep(1000); } catch (InterruptedException ie) { - // IGNORE + Thread.currentThread().interrupt(); + throw (IOException) new InterruptedIOException( + "Interrupted waiting for the proxy").initCause(ioe); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0111b57e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java index ed12b3c..b50f7e9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java @@ -338,6 +338,12 @@ abstract class SocketIOWithTimeout { return ret; } + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedIOException("Interrupted while waiting for " + + "IO on channel " + channel + ". " + timeout + + " millis timeout left."); + } + /* Sometimes select() returns 0 much before timeout for * unknown reasons. So select again if required. */ @@ -348,12 +354,6 @@ abstract class SocketIOWithTimeout { } } - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedIOException("Interruped while waiting for " + - "IO on channel " + channel + - ". " + timeout + - " millis timeout left."); - } } } finally { if (key != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0111b57e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java new file mode 100644 index 0000000..5807998 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*; +import org.apache.hadoop.ipc.TestRPC.TestProtocol; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.nio.channels.ClosedByInterruptException; + +/** + * tests that the proxy can be interrupted + */ +public class TestRPCWaitForProxy extends Assert { + private static final String ADDRESS = "0.0.0.0"; + private static final Logger + LOG = LoggerFactory.getLogger(TestRPCWaitForProxy.class); + + private static final Configuration conf = new Configuration(); + + /** + * This tests that the time-bounded wait for a proxy operation works, and + * times out. + * + * @throws Throwable any exception other than that which was expected + */ + @Test(timeout = 10000) + public void testWaitForProxy() throws Throwable { + RpcThread worker = new RpcThread(0); + worker.start(); + worker.join(); + Throwable caught = worker.getCaught(); + assertNotNull("No exception was raised", caught); + if (!(caught instanceof ConnectException)) { + throw caught; + } + } + + /** + * This test sets off a blocking thread and then interrupts it, before + * checking that the thread was interrupted + * + * @throws Throwable any exception other than that which was expected + */ + @Test(timeout = 10000) + public void testInterruptedWaitForProxy() throws Throwable { + RpcThread worker = new RpcThread(100); + worker.start(); + Thread.sleep(1000); + assertTrue("worker hasn't started", worker.waitStarted); + worker.interrupt(); + worker.join(); + Throwable caught = worker.getCaught(); + assertNotNull("No exception was raised", caught); + // looking for the root cause here, which can be wrapped + // as part of the NetUtils work. Having this test look + // a the type of exception there would be brittle to improvements + // in exception diagnostics. + Throwable cause = caught.getCause(); + if (cause == null) { + // no inner cause, use outer exception as root cause. + cause = caught; + } + if (!(cause instanceof InterruptedIOException) + && !(cause instanceof ClosedByInterruptException)) { + throw caught; + } + } + + /** + * This thread waits for a proxy for the specified timeout, and retains any + * throwable that was raised in the process + */ + + private class RpcThread extends Thread { + private Throwable caught; + private int connectRetries; + private volatile boolean waitStarted = false; + + private RpcThread(int connectRetries) { + this.connectRetries = connectRetries; + } + @Override + public void run() { + try { + Configuration config = new Configuration(conf); + config.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + connectRetries); + config.setInt( + IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + connectRetries); + waitStarted = true; + TestProtocol proxy = RPC.waitForProxy(TestProtocol.class, + TestProtocol.versionID, + new InetSocketAddress(ADDRESS, 20), + config, + 15000L); + proxy.echo(""); + } catch (Throwable throwable) { + caught = throwable; + } + } + + public Throwable getCaught() { + return caught; + } + } +}