Repository: hadoop Updated Branches: refs/heads/branch-2 4e342f603 -> 5902c0658 refs/heads/branch-2.9 809faede9 -> 6ed97eba2
HADOOP-10219. ipc.Client.setupIOstreams() needs to check for ClientCache.stopClient requested shutdowns. Contributed by Kihwal Lee and Lukas Majercak. (cherry picked from commit 9e96ac666d783376a8cdea9c3cc84098c5bdcb56) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5902c065 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5902c065 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5902c065 Branch: refs/heads/branch-2 Commit: 5902c0658eb7ad7092c2bdeb10c2296015c84c08 Parents: 4e342f6 Author: Steve Loughran <ste...@apache.org> Authored: Tue Sep 4 16:46:12 2018 +0100 Committer: Inigo Goiri <inigo...@apache.org> Committed: Tue Sep 4 11:58:56 2018 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/hadoop/ipc/Client.java | 14 ++++++ .../java/org/apache/hadoop/ipc/TestIPC.java | 45 ++++++++++++++++++++ 2 files changed, 59 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5902c065/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 533b6ca..2636adb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -70,6 +70,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID; import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID; @@ -440,6 +441,8 @@ public class Client implements AutoCloseable { private final Object sendRpcRequestLock = new Object(); + private AtomicReference<Thread> connectingThread = new AtomicReference<>(); + public Connection(ConnectionId remoteId, int serviceClass) throws IOException { this.remoteId = remoteId; this.server = remoteId.getAddress(); @@ -777,6 +780,7 @@ public class Client implements AutoCloseable { } } try { + connectingThread.set(Thread.currentThread()); if (LOG.isDebugEnabled()) { LOG.debug("Connecting to "+server); } @@ -862,6 +866,8 @@ public class Client implements AutoCloseable { markClosed(new IOException("Couldn't set up IO streams: " + t, t)); } close(); + } finally { + connectingThread.set(null); } } @@ -1215,6 +1221,13 @@ public class Client implements AutoCloseable { notifyAll(); } } + + private void interruptConnectingThread() { + Thread connThread = connectingThread.get(); + if (connThread != null) { + connThread.interrupt(); + } + } /** Close the connection. */ private synchronized void close() { @@ -1317,6 +1330,7 @@ public class Client implements AutoCloseable { // wake up all connections for (Connection conn : connections.values()) { conn.interrupt(); + conn.interruptConnectingThread(); } // wait until all connections are closed http://git-wip-us.apache.org/repos/asf/hadoop/blob/5902c065/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index a6c57fe..95e76f7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -1398,6 +1399,50 @@ public class TestIPC { assertEquals(Client.getTimeout(config), -1); } + @Test(timeout=60000) + public void testSetupConnectionShouldNotBlockShutdown() throws Exception { + // Start server + SocketFactory mockFactory = Mockito.mock(SocketFactory.class); + Server server = new TestServer(1, true); + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + + // Track how many times we retried to set up the connection + final AtomicInteger createSocketCalled = new AtomicInteger(); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + createSocketCalled.addAndGet(1); + Thread.sleep(MIN_SLEEP_TIME * 5); + throw new ConnectTimeoutException("fake"); + } + }).when(mockFactory).createSocket(); + final Client client = new Client(LongWritable.class, conf, mockFactory); + + final AtomicBoolean callStarted = new AtomicBoolean(false); + + // Call a random function asynchronously so that we can call stop() + new Thread(new Runnable() { + public void run() { + try { + callStarted.set(true); + call(client, RANDOM.nextLong(), addr, conf); + } catch (IOException ignored) {} + } + }).start(); + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return callStarted.get() && createSocketCalled.get() == 1; + } + }, 50, 60000); + + // stop() should stop the client immediately without any more retries + client.stop(); + assertEquals(1, createSocketCalled.get()); + } + private void assertRetriesOnSocketTimeouts(Configuration conf, int maxTimeoutRetries) throws IOException { SocketFactory mockFactory = Mockito.mock(SocketFactory.class); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org