Repository: incubator-ratis
Updated Branches:
  refs/heads/master 7a5c3ea12 -> 96c470d88


RATIS-116. In PendingRequests, the requests are never removed from the map.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/96c470d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/96c470d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/96c470d8

Branch: refs/heads/master
Commit: 96c470d884423f0997ac06073b53a541be856d11
Parents: 7a5c3ea
Author: Tsz-Wo Nicholas Sze <[email protected]>
Authored: Wed Oct 11 11:05:31 2017 +0800
Committer: Tsz-Wo Nicholas Sze <[email protected]>
Committed: Wed Oct 11 11:05:31 2017 +0800

----------------------------------------------------------------------
 .../org/apache/ratis/client/RaftClient.java     |  2 +-
 .../ratis/server/impl/PendingRequest.java       | 10 ++--
 .../ratis/server/impl/PendingRequests.java      | 57 +++++++++++++++-----
 3 files changed, 51 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96c470d8/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index c682350..f1fbeb0 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -82,7 +82,7 @@ public interface RaftClient extends Closeable {
         retryInterval = RaftClientConfigKeys.Rpc.timeout(properties);
       }
       return ClientImplUtils.newRaftClient(clientId,
-          Objects.requireNonNull(group, "The 'servers' field is not 
initialized."),
+          Objects.requireNonNull(group, "The 'group' field is not 
initialized."),
           leaderId,
           Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not 
initialized."),
           retryInterval);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96c470d8/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
index f1909d4..95731c5 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -17,10 +17,7 @@
  */
 package org.apache.ratis.server.impl;
 
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.*;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.Preconditions;
 
@@ -73,6 +70,11 @@ public class PendingRequest implements 
Comparable<PendingRequest> {
     future.complete(r);
   }
 
+  TransactionContext setNotLeaderException(NotLeaderException nle) {
+    setReply(new RaftClientReply(getRequest(), nle));
+    return getEntry();
+  }
+
   void setSuccessReply(Message message) {
     setReply(new RaftClientReply(getRequest(), message));
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96c470d8/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index b7b8a9e..ab8ad2c 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -31,13 +31,52 @@ import java.util.stream.Collectors;
 class PendingRequests {
   private static final Logger LOG = RaftServerImpl.LOG;
 
+  private static class RequestMap {
+    private final Object name;
+    private final ConcurrentMap<Long, PendingRequest> map = new 
ConcurrentHashMap<>();
+
+    RequestMap(Object name) {
+      this.name = name;
+    }
+
+    void put(long index, PendingRequest p) {
+      LOG.debug("{}: PendingRequests.put {} -> {}", name, index, p);
+      final PendingRequest previous = map.put(index, p);
+      Preconditions.assertTrue(previous == null);
+    }
+
+    PendingRequest get(long index) {
+      final PendingRequest r = map.get(index);
+      LOG.debug("{}: PendingRequests.get {} returns {}", name, index, r);
+      return r;
+    }
+
+    PendingRequest remove(long index) {
+      final PendingRequest r = map.remove(index);
+      LOG.debug("{}: PendingRequests.remove{} returns {}", name, index, r);
+      return r;
+    }
+
+    Collection<TransactionContext> setNotLeaderException(NotLeaderException 
nle) {
+      LOG.debug("{}: PendingRequests.setNotLeaderException", name);
+      try {
+        return map.values().stream()
+            .map(p -> p.setNotLeaderException(nle))
+            .collect(Collectors.toList());
+      } finally {
+        map.clear();
+      }
+    }
+  }
+
   private PendingRequest pendingSetConf;
   private final RaftServerImpl server;
-  private final ConcurrentMap<Long, PendingRequest> pendingRequests = new 
ConcurrentHashMap<>();
+  private final RequestMap pendingRequests;
   private PendingRequest last = null;
 
   PendingRequests(RaftServerImpl server) {
     this.server = server;
+    this.pendingRequests = new RequestMap(server.getId());
   }
 
   PendingRequest addPendingRequest(long index, RaftClientRequest request,
@@ -92,7 +131,7 @@ class PendingRequests {
   }
 
   void replyPendingRequest(long index, RaftClientReply reply) {
-    final PendingRequest pending = pendingRequests.get(index);
+    final PendingRequest pending = pendingRequests.remove(index);
     if (pending != null) {
       Preconditions.assertTrue(pending.getIndex() == index);
       pending.setReply(reply);
@@ -107,19 +146,11 @@ class PendingRequests {
     LOG.info("{} sends responses before shutting down PendingRequestsHandler",
         server.getId());
 
-    Collection<TransactionContext> pendingEntries = 
pendingRequests.values().stream()
-        .map(PendingRequest::getEntry).collect(Collectors.toList());
     // notify the state machine about stepping down
-    server.getStateMachine().notifyNotLeader(pendingEntries);
-    pendingRequests.values().forEach(this::setNotLeaderException);
+    final NotLeaderException nle = server.generateNotLeaderException();
+    
server.getStateMachine().notifyNotLeader(pendingRequests.setNotLeaderException(nle));
     if (pendingSetConf != null) {
-      setNotLeaderException(pendingSetConf);
+      pendingSetConf.setNotLeaderException(nle);
     }
   }
-
-  private void setNotLeaderException(PendingRequest pending) {
-    RaftClientReply reply = new RaftClientReply(pending.getRequest(),
-        server.generateNotLeaderException());
-    pending.setReply(reply);
-  }
 }

Reply via email to