This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7897605f2a IGNITE-21766 Avoid a race while handling messages in jraft 
(#3423)
7897605f2a is described below

commit 7897605f2a8146ee60a0af57e864af793b19610e
Author: Cyrill <[email protected]>
AuthorDate: Mon Mar 18 10:26:41 2024 +0300

    IGNITE-21766 Avoid a race while handling messages in jraft (#3423)
---
 .../impl/core/AppendEntriesRequestProcessor.java   | 52 +++++++---------------
 .../core/AppendEntriesRequestProcessorTest.java    | 28 ++++++------
 .../impl/core/BaseNodeRequestProcessorTest.java    |  2 +-
 3 files changed, 31 insertions(+), 51 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
index 1b087cbbd9..e24f7e0f68 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
@@ -69,7 +69,7 @@ public class AppendEntriesRequestProcessor extends 
NodeRequestProcessor<AppendEn
 
             PeerPair pair = pairOf(peerId, serverId);
 
-            final PeerRequestContext ctx = 
getOrCreatePeerRequestContext(groupId, pair, nodeManager);
+            final PeerRequestContext ctx = 
getOrCreatePeerRequestContext(groupId, pair, node);
 
             return ctx.executor;
         }
@@ -322,41 +322,19 @@ public class AppendEntriesRequestProcessor extends 
NodeRequestProcessor<AppendEn
         }
     }
 
-    PeerRequestContext getOrCreatePeerRequestContext(final String groupId, 
final PeerPair pair,
-        NodeManager nodeManager) {
-        ConcurrentMap<PeerPair, PeerRequestContext> groupContexts = 
this.peerRequestContexts.get(groupId);
-        if (groupContexts == null) {
-            groupContexts = new ConcurrentHashMap<>();
-            final ConcurrentMap<PeerPair, PeerRequestContext> existsCtxs = 
this.peerRequestContexts.putIfAbsent(
-                groupId, groupContexts);
-            if (existsCtxs != null) {
-                groupContexts = existsCtxs;
-            }
-        }
+    PeerRequestContext getOrCreatePeerRequestContext(final String groupId, 
final PeerPair pair, Node node) {
+            ConcurrentMap<PeerPair, PeerRequestContext> groupContexts =
+                this.peerRequestContexts.computeIfAbsent(groupId, s -> new 
ConcurrentHashMap<>());
 
-        PeerRequestContext peerCtx = groupContexts.get(pair);
-        if (peerCtx == null) {
-            synchronized (Utils.withLockObject(groupContexts)) {
-                peerCtx = groupContexts.get(pair);
-                // double check in lock
-                if (peerCtx == null) {
-                    // only one thread to process append entries for every 
jraft node
-                    final PeerId peer = new PeerId();
-                    final boolean parsed = peer.parse(pair.local);
-                    assert (parsed);
-                    final Node node = nodeManager.get(groupId, peer);
-                    assert (node != null);
-                    peerCtx = new PeerRequestContext(groupId, pair, 
node.getRaftOptions().getMaxReplicatorInflightMsgs());
-
-                    peerCtx.executor = 
node.getOptions().getStripedExecutor().next();
-
-                    groupContexts.put(pair, peerCtx);
-                }
-            }
-        }
+            return groupContexts.computeIfAbsent(pair, peerPair -> {
+                PeerRequestContext ctx =
+                    new PeerRequestContext(groupId, pair, 
node.getRaftOptions().getMaxReplicatorInflightMsgs());
 
-        return peerCtx;
-    }
+                ctx.executor = node.getOptions().getStripedExecutor().next();
+
+                return ctx;
+            });
+        }
 
     void removePeerRequestContext(final String groupId, final PeerPair pair) {
         final ConcurrentMap<PeerPair, PeerRequestContext> groupContexts = 
this.peerRequestContexts.get(groupId);
@@ -393,9 +371,9 @@ public class AppendEntriesRequestProcessor extends 
NodeRequestProcessor<AppendEn
         return request.groupId();
     }
 
-    private int getAndIncrementSequence(final String groupId, final PeerPair 
pair, NodeManager nodeManager) {
+    private int getAndIncrementSequence(final String groupId, final PeerPair 
pair, Node node) {
         // TODO asch can use getPeerContext because it must already present 
(created before) ??? IGNITE-14832
-        return getOrCreatePeerRequestContext(groupId, pair, 
nodeManager).getAndIncrementSequence();
+        return getOrCreatePeerRequestContext(groupId, pair, 
node).getAndIncrementSequence();
     }
 
     private boolean isHeartbeatRequest(final AppendEntriesRequest request) {
@@ -418,7 +396,7 @@ public class AppendEntriesRequestProcessor extends 
NodeRequestProcessor<AppendEn
             int reqSequence = -1;
 
             if (!isHeartbeat) {
-                reqSequence = getAndIncrementSequence(groupId, pair, 
done.getRpcCtx().getNodeManager());
+                reqSequence = getAndIncrementSequence(groupId, pair, node);
             }
 
             final Message response = 
service.handleAppendEntriesRequest(request, new SequenceRpcRequestClosure(done,
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessorTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessorTest.java
index 55ff4c7691..46bb4d2ed0 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessorTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessorTest.java
@@ -99,18 +99,18 @@ public class AppendEntriesRequestProcessorTest extends 
BaseNodeRequestProcessorT
 
     @Test
     public void testOnClosed() {
-        mockNode();
+        final PeerId peer = mockNode();
         final AppendEntriesRequestProcessor processor = 
(AppendEntriesRequestProcessor) newProcessor();
 
         PeerPair pair = processor.pairOf(this.peerIdStr, this.serverId);
-        final PeerRequestContext ctx = 
processor.getOrCreatePeerRequestContext(this.groupId, pair, nodeManager);
+        final PeerRequestContext ctx = 
processor.getOrCreatePeerRequestContext(this.groupId, pair, 
nodeManager.get(this.groupId, peer));
         assertNotNull(ctx);
         assertSame(ctx, processor.getPeerRequestContext(this.groupId, pair));
-        assertSame(ctx, processor.getOrCreatePeerRequestContext(this.groupId, 
pair, nodeManager));
+        assertSame(ctx, processor.getOrCreatePeerRequestContext(this.groupId, 
pair, nodeManager.get(this.groupId, peer)));
 
         processor.onClosed(peerIdStr, this.serverId);
         assertNull(processor.getPeerRequestContext(this.groupId, pair));
-        assertNotSame(ctx, 
processor.getOrCreatePeerRequestContext(this.groupId, pair, nodeManager));
+        assertNotSame(ctx, 
processor.getOrCreatePeerRequestContext(this.groupId, pair, 
nodeManager.get(this.groupId, peer)));
     }
 
     @Override
@@ -120,19 +120,20 @@ public class AppendEntriesRequestProcessorTest extends 
BaseNodeRequestProcessorT
         Mockito.verify(service).handleAppendEntriesRequest(eq(this.request), 
Mockito.any());
         final PeerPair pair = ((AppendEntriesRequestProcessor) 
processor).pairOf(this.peerIdStr, this.serverId);
         final PeerRequestContext ctx = ((AppendEntriesRequestProcessor) 
processor).getOrCreatePeerRequestContext(
-            this.groupId, pair, nodeManager);
+            this.groupId, pair, nodeManager.get(this.groupId, mockNode()));
         assertNotNull(ctx);
     }
 
     @Test
     public void testGetPeerRequestContextRemovePeerRequestContext() {
-        mockNode();
+        PeerId peer = mockNode();
 
         final AppendEntriesRequestProcessor processor = 
(AppendEntriesRequestProcessor) newProcessor();
         final PeerPair pair = processor.pairOf(this.peerIdStr, this.serverId);
-        final PeerRequestContext ctx = 
processor.getOrCreatePeerRequestContext(this.groupId, pair, nodeManager);
+        final PeerRequestContext ctx =
+                processor.getOrCreatePeerRequestContext(this.groupId, pair, 
nodeManager.get(this.groupId, peer));
         assertNotNull(ctx);
-        assertSame(ctx, processor.getOrCreatePeerRequestContext(this.groupId, 
pair, nodeManager));
+        assertSame(ctx, processor.getOrCreatePeerRequestContext(this.groupId, 
pair, nodeManager.get(this.groupId, peer)));
         assertEquals(0, ctx.getNextRequiredSequence());
         assertEquals(0, ctx.getAndIncrementSequence());
         assertEquals(1, ctx.getAndIncrementSequence());
@@ -141,7 +142,8 @@ public class AppendEntriesRequestProcessorTest extends 
BaseNodeRequestProcessorT
         assertFalse(ctx.hasTooManyPendingResponses());
 
         processor.removePeerRequestContext(this.groupId, pair);
-        final PeerRequestContext newCtx = 
processor.getOrCreatePeerRequestContext(this.groupId, pair, nodeManager);
+        final PeerRequestContext newCtx =
+                processor.getOrCreatePeerRequestContext(this.groupId, pair, 
nodeManager.get(this.groupId, peer));
         assertNotNull(newCtx);
         assertNotSame(ctx, newCtx);
 
@@ -155,10 +157,10 @@ public class AppendEntriesRequestProcessorTest extends 
BaseNodeRequestProcessorT
 
     @Test
     public void testSendSequenceResponse() {
-        mockNode();
+        final PeerId peer = mockNode();
         final AppendEntriesRequestProcessor processor = 
(AppendEntriesRequestProcessor) newProcessor();
         final PeerPair pair = processor.pairOf(this.peerIdStr, this.serverId);
-        processor.getOrCreatePeerRequestContext(this.groupId, pair, 
nodeManager);
+        processor.getOrCreatePeerRequestContext(this.groupId, pair, 
nodeManager.get(this.groupId, peer));
         final PingRequest msg = TestUtils.createPingRequest();
         final RpcContext asyncContext = Mockito.mock(RpcContext.class);
         processor.sendSequenceResponse(this.groupId, pair, 1, asyncContext, 
msg);
@@ -177,14 +179,14 @@ public class AppendEntriesRequestProcessorTest extends 
BaseNodeRequestProcessorT
         final AppendEntriesRequestProcessor processor = 
(AppendEntriesRequestProcessor) newProcessor();
         final PeerPair pair = processor.pairOf(this.peerIdStr, this.serverId);
         final PingRequest msg = TestUtils.createPingRequest();
-        final PeerRequestContext ctx = 
processor.getOrCreatePeerRequestContext(this.groupId, pair, nodeManager);
+        final PeerRequestContext ctx = 
processor.getOrCreatePeerRequestContext(this.groupId, pair, 
nodeManager.get(this.groupId, peer));
         assertNotNull(ctx);
         processor.sendSequenceResponse(this.groupId, pair, 1, asyncContext, 
msg);
         processor.sendSequenceResponse(this.groupId, pair, 2, asyncContext, 
msg);
         processor.sendSequenceResponse(this.groupId, pair, 3, asyncContext, 
msg);
         Mockito.verify(asyncContext, Mockito.never()).sendResponse(msg);
 
-        final PeerRequestContext newCtx = 
processor.getOrCreatePeerRequestContext(this.groupId, pair, nodeManager);
+        final PeerRequestContext newCtx = 
processor.getOrCreatePeerRequestContext(this.groupId, pair, 
nodeManager.get(this.groupId, peer));
         assertNotNull(newCtx);
         assertNotSame(ctx, newCtx);
     }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/BaseNodeRequestProcessorTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/BaseNodeRequestProcessorTest.java
index dbabfeafe6..f6566b3675 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/BaseNodeRequestProcessorTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/BaseNodeRequestProcessorTest.java
@@ -78,7 +78,7 @@ public abstract class BaseNodeRequestProcessorTest<T extends 
Message> extends Ba
     }
 
     protected PeerId mockNode() {
-        Mockito.when(node.getGroupId()).thenReturn(this.groupId);
+        Mockito.lenient().when(node.getGroupId()).thenReturn(this.groupId);
         final PeerId peerId = new PeerId();
         peerId.parse(this.peerIdStr);
         Mockito.when(node.getNodeId()).thenReturn(new NodeId(groupId, peerId));

Reply via email to