This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new d11320d  RATIS-706. Dead lock in GrpcClientRpc. Contributed by Tsz Wo 
Nicholas Sze.
d11320d is described below

commit d11320db65ce26b40994959c8a72c8f69556ad81
Author: Lokesh Jain <[email protected]>
AuthorDate: Thu Oct 17 23:56:07 2019 +0530

    RATIS-706. Dead lock in GrpcClientRpc. Contributed by Tsz Wo Nicholas Sze.
---
 .../java/org/apache/ratis/util/PeerProxyMap.java   | 47 ++++++++-----
 .../src/test/java/org/apache/ratis/BaseTest.java   | 16 ++++-
 .../org/apache/ratis/util/TestPeerProxyMap.java    | 80 ++++++++++++++++++++++
 3 files changed, 126 insertions(+), 17 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java 
b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index 514c37e..ea0c2f1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -28,6 +28,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
 /** A map from peer id to peer and its proxy. */
@@ -35,7 +36,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements 
Closeable {
   public static final Logger LOG = LoggerFactory.getLogger(PeerProxyMap.class);
 
   /** Peer and its proxy. */
-  private class PeerAndProxy implements Closeable {
+  private class PeerAndProxy {
     private final RaftPeer peer;
     private volatile PROXY proxy = null;
     private final LifeCycle lifeCycle;
@@ -65,19 +66,18 @@ public class PeerProxyMap<PROXY extends Closeable> 
implements Closeable {
       return proxy;
     }
 
+    Optional<PROXY> setNullProxyAndClose() {
+      final PROXY p;
+      synchronized (this) {
+        p = proxy;
+        lifeCycle.checkStateAndClose(() -> proxy = null);
+      }
+      return Optional.ofNullable(p);
+    }
+
     @Override
-    public synchronized void close() {
-      lifeCycle.checkStateAndClose(() -> {
-        if (proxy != null) {
-          try {
-            LOG.debug("{}: Closing proxy for peer {}", name, peer);
-            proxy.close();
-          } catch (IOException e) {
-            LOG.warn("{}: Failed to close proxy for peer {}, proxy class: {}",
-                name, peer, proxy.getClass(), e);
-          }
-        }
-      });
+    public String toString() {
+      return peer.toString();
     }
   }
 
@@ -120,14 +120,18 @@ public class PeerProxyMap<PROXY extends Closeable> 
implements Closeable {
 
   public void resetProxy(RaftPeerId id) {
     LOG.debug("{}: reset proxy for {}", name, id );
+    final PeerAndProxy pp;
+    Optional<PROXY> optional = Optional.empty();
     synchronized (resetLock) {
-      final PeerAndProxy pp = peers.remove(id);
+      pp = peers.remove(id);
       if (pp != null) {
         final RaftPeer peer = pp.getPeer();
-        pp.close();
+        optional = pp.setNullProxyAndClose();
         computeIfAbsent(peer);
       }
     }
+    // close proxy without holding the reset lock
+    optional.ifPresent(proxy -> closeProxy(proxy, pp));
   }
 
   /** @return true if the given throwable is handled; otherwise, the call is 
an no-op, return false. */
@@ -145,6 +149,17 @@ public class PeerProxyMap<PROXY extends Closeable> 
implements Closeable {
 
   @Override
   public void close() {
-    peers.values().forEach(PeerAndProxy::close);
+    peers.values().parallelStream().forEach(
+        pp -> pp.setNullProxyAndClose().ifPresent(proxy -> closeProxy(proxy, 
pp)));
+  }
+
+  private void closeProxy(PROXY proxy, PeerAndProxy pp) {
+    try {
+      LOG.debug("{}: Closing proxy for peer {}", name, pp);
+      proxy.close();
+    } catch (IOException e) {
+      LOG.warn("{}: Failed to close proxy for peer {}, proxy class: {}",
+          name, pp, proxy.getClass(), e);
+    }
   }
 }
\ No newline at end of file
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java 
b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index 6598ba5..5edbf38 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 public abstract class BaseTest {
@@ -56,8 +57,21 @@ public abstract class BaseTest {
     ExitUtils.disableSystemExit();
   }
 
+  private final AtomicReference<Throwable> firstException = new 
AtomicReference<>();
+
+  public void setFirstException(Throwable e) {
+    if (firstException.compareAndSet(null, e)) {
+      LOG.error("Set firstException", e);
+    }
+  }
+
   @After
-  public void assertNotTerminated() {
+  public void assertNoFailures() {
+    final Throwable e = firstException.get();
+    if (e != null) {
+      throw new IllegalStateException("Failed: first exception was set", e);
+    }
+
     ExitUtils.assertNotTerminated();
   }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/util/TestPeerProxyMap.java 
b/ratis-test/src/test/java/org/apache/ratis/util/TestPeerProxyMap.java
new file mode 100644
index 0000000..e0e1c49
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestPeerProxyMap.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Closeable;
+
+/** Tests for {@link PeerProxyMap}. */
+public class TestPeerProxyMap extends BaseTest {
+  class DummyProxy implements Closeable {
+    private final RaftPeer peer;
+
+    DummyProxy(RaftPeer peer) {
+      this.peer = peer;
+    }
+
+    @Override
+    public void close() {
+      LOG.info("{}: close before lock", this);
+      synchronized(this) {
+        LOG.info("{}: close in lock", this);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return peer.toString();
+    }
+  }
+
+  @Test(timeout = 10_000)
+  public void testCloseDeadLock() throws Exception {
+    final PeerProxyMap<DummyProxy> map = new PeerProxyMap<>("test", 
DummyProxy::new);
+    final RaftPeerId id = RaftPeerId.valueOf("s0");
+    final RaftPeer peer = new RaftPeer(id);
+    map.computeIfAbsent(peer);
+
+    final DummyProxy proxy = map.getProxy(id);
+
+    final Thread t = new Thread(() -> {
+      // hold proxy lock and then getProxy(..) which requires resetLock
+      synchronized (proxy) {
+        LOG.info("Acquired lock");
+        try {
+          HUNDRED_MILLIS.sleep();
+          LOG.info("Try getProxy");
+          final DummyProxy newProxy = map.getProxy(id);
+          Assert.assertNotSame(proxy, newProxy);
+        } catch (Exception e) {
+          setFirstException(e);
+        }
+        LOG.info("Will release lock");
+      }
+    });
+    t.start();
+
+    map.resetProxy(id); // hold resetLock and then call close() with requires 
proxy lock
+    t.join();
+  }
+}

Reply via email to