Repository: hadoop
Updated Branches:
  refs/heads/branch-2 4e342f603 -> 5902c0658
  refs/heads/branch-2.9 809faede9 -> 6ed97eba2


HADOOP-10219. ipc.Client.setupIOstreams() needs to check for 
ClientCache.stopClient requested shutdowns.
Contributed by Kihwal Lee and Lukas Majercak.

(cherry picked from commit 9e96ac666d783376a8cdea9c3cc84098c5bdcb56)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5902c065
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5902c065
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5902c065

Branch: refs/heads/branch-2
Commit: 5902c0658eb7ad7092c2bdeb10c2296015c84c08
Parents: 4e342f6
Author: Steve Loughran <ste...@apache.org>
Authored: Tue Sep 4 16:46:12 2018 +0100
Committer: Inigo Goiri <inigo...@apache.org>
Committed: Tue Sep 4 11:58:56 2018 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/ipc/Client.java | 14 ++++++
 .../java/org/apache/hadoop/ipc/TestIPC.java     | 45 ++++++++++++++++++++
 2 files changed, 59 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5902c065/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 533b6ca..2636adb 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -70,6 +70,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
 import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
@@ -440,6 +441,8 @@ public class Client implements AutoCloseable {
     
     private final Object sendRpcRequestLock = new Object();
 
+    private AtomicReference<Thread> connectingThread = new AtomicReference<>();
+
     public Connection(ConnectionId remoteId, int serviceClass) throws 
IOException {
       this.remoteId = remoteId;
       this.server = remoteId.getAddress();
@@ -777,6 +780,7 @@ public class Client implements AutoCloseable {
         }
       }
       try {
+        connectingThread.set(Thread.currentThread());
         if (LOG.isDebugEnabled()) {
           LOG.debug("Connecting to "+server);
         }
@@ -862,6 +866,8 @@ public class Client implements AutoCloseable {
           markClosed(new IOException("Couldn't set up IO streams: " + t, t));
         }
         close();
+      } finally {
+        connectingThread.set(null);
       }
     }
     
@@ -1215,6 +1221,13 @@ public class Client implements AutoCloseable {
         notifyAll();
       }
     }
+
+    private void interruptConnectingThread() {
+      Thread connThread = connectingThread.get();
+      if (connThread != null) {
+        connThread.interrupt();
+      }
+    }
     
     /** Close the connection. */
     private synchronized void close() {
@@ -1317,6 +1330,7 @@ public class Client implements AutoCloseable {
     // wake up all connections
     for (Connection conn : connections.values()) {
       conn.interrupt();
+      conn.interruptConnectingThread();
     }
     
     // wait until all connections are closed

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5902c065/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index a6c57fe..95e76f7 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -1398,6 +1399,50 @@ public class TestIPC {
     assertEquals(Client.getTimeout(config), -1);
   }
 
+  @Test(timeout=60000)
+  public void testSetupConnectionShouldNotBlockShutdown() throws Exception {
+    // Start server
+    SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
+    Server server = new TestServer(1, true);
+    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+
+    // Track how many times we retried to set up the connection
+    final AtomicInteger createSocketCalled = new AtomicInteger();
+
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocationOnMock) throws Throwable 
{
+        createSocketCalled.addAndGet(1);
+        Thread.sleep(MIN_SLEEP_TIME * 5);
+        throw new ConnectTimeoutException("fake");
+      }
+    }).when(mockFactory).createSocket();
+    final Client client = new Client(LongWritable.class, conf, mockFactory);
+
+    final AtomicBoolean callStarted = new AtomicBoolean(false);
+
+    // Call a random function asynchronously so that we can call stop()
+    new Thread(new Runnable() {
+      public void run() {
+        try {
+          callStarted.set(true);
+          call(client, RANDOM.nextLong(), addr, conf);
+        } catch (IOException ignored) {}
+      }
+    }).start();
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return callStarted.get() && createSocketCalled.get() == 1;
+      }
+    }, 50, 60000);
+
+    // stop() should stop the client immediately without any more retries
+    client.stop();
+    assertEquals(1, createSocketCalled.get());
+  }
+
   private void assertRetriesOnSocketTimeouts(Configuration conf,
       int maxTimeoutRetries) throws IOException {
     SocketFactory mockFactory = Mockito.mock(SocketFactory.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to