This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7897605f2a IGNITE-21766 Avoid a race while handling messages in jraft
(#3423)
7897605f2a is described below
commit 7897605f2a8146ee60a0af57e864af793b19610e
Author: Cyrill <[email protected]>
AuthorDate: Mon Mar 18 10:26:41 2024 +0300
IGNITE-21766 Avoid a race while handling messages in jraft (#3423)
---
.../impl/core/AppendEntriesRequestProcessor.java | 52 +++++++---------------
.../core/AppendEntriesRequestProcessorTest.java | 28 ++++++------
.../impl/core/BaseNodeRequestProcessorTest.java | 2 +-
3 files changed, 31 insertions(+), 51 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
index 1b087cbbd9..e24f7e0f68 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
@@ -69,7 +69,7 @@ public class AppendEntriesRequestProcessor extends
NodeRequestProcessor<AppendEn
PeerPair pair = pairOf(peerId, serverId);
- final PeerRequestContext ctx =
getOrCreatePeerRequestContext(groupId, pair, nodeManager);
+ final PeerRequestContext ctx =
getOrCreatePeerRequestContext(groupId, pair, node);
return ctx.executor;
}
@@ -322,41 +322,19 @@ public class AppendEntriesRequestProcessor extends
NodeRequestProcessor<AppendEn
}
}
- PeerRequestContext getOrCreatePeerRequestContext(final String groupId,
final PeerPair pair,
- NodeManager nodeManager) {
- ConcurrentMap<PeerPair, PeerRequestContext> groupContexts =
this.peerRequestContexts.get(groupId);
- if (groupContexts == null) {
- groupContexts = new ConcurrentHashMap<>();
- final ConcurrentMap<PeerPair, PeerRequestContext> existsCtxs =
this.peerRequestContexts.putIfAbsent(
- groupId, groupContexts);
- if (existsCtxs != null) {
- groupContexts = existsCtxs;
- }
- }
+ PeerRequestContext getOrCreatePeerRequestContext(final String groupId,
final PeerPair pair, Node node) {
+ ConcurrentMap<PeerPair, PeerRequestContext> groupContexts =
+ this.peerRequestContexts.computeIfAbsent(groupId, s -> new
ConcurrentHashMap<>());
- PeerRequestContext peerCtx = groupContexts.get(pair);
- if (peerCtx == null) {
- synchronized (Utils.withLockObject(groupContexts)) {
- peerCtx = groupContexts.get(pair);
- // double check in lock
- if (peerCtx == null) {
- // only one thread to process append entries for every
jraft node
- final PeerId peer = new PeerId();
- final boolean parsed = peer.parse(pair.local);
- assert (parsed);
- final Node node = nodeManager.get(groupId, peer);
- assert (node != null);
- peerCtx = new PeerRequestContext(groupId, pair,
node.getRaftOptions().getMaxReplicatorInflightMsgs());
-
- peerCtx.executor =
node.getOptions().getStripedExecutor().next();
-
- groupContexts.put(pair, peerCtx);
- }
- }
- }
+ return groupContexts.computeIfAbsent(pair, peerPair -> {
+ PeerRequestContext ctx =
+ new PeerRequestContext(groupId, pair,
node.getRaftOptions().getMaxReplicatorInflightMsgs());
- return peerCtx;
- }
+ ctx.executor = node.getOptions().getStripedExecutor().next();
+
+ return ctx;
+ });
+ }
void removePeerRequestContext(final String groupId, final PeerPair pair) {
final ConcurrentMap<PeerPair, PeerRequestContext> groupContexts =
this.peerRequestContexts.get(groupId);
@@ -393,9 +371,9 @@ public class AppendEntriesRequestProcessor extends
NodeRequestProcessor<AppendEn
return request.groupId();
}
- private int getAndIncrementSequence(final String groupId, final PeerPair
pair, NodeManager nodeManager) {
+ private int getAndIncrementSequence(final String groupId, final PeerPair
pair, Node node) {
// TODO asch can use getPeerContext because it must already present
(created before) ??? IGNITE-14832
- return getOrCreatePeerRequestContext(groupId, pair,
nodeManager).getAndIncrementSequence();
+ return getOrCreatePeerRequestContext(groupId, pair,
node).getAndIncrementSequence();
}
private boolean isHeartbeatRequest(final AppendEntriesRequest request) {
@@ -418,7 +396,7 @@ public class AppendEntriesRequestProcessor extends
NodeRequestProcessor<AppendEn
int reqSequence = -1;
if (!isHeartbeat) {
- reqSequence = getAndIncrementSequence(groupId, pair,
done.getRpcCtx().getNodeManager());
+ reqSequence = getAndIncrementSequence(groupId, pair, node);
}
final Message response =
service.handleAppendEntriesRequest(request, new SequenceRpcRequestClosure(done,
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessorTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessorTest.java
index 55ff4c7691..46bb4d2ed0 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessorTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessorTest.java
@@ -99,18 +99,18 @@ public class AppendEntriesRequestProcessorTest extends
BaseNodeRequestProcessorT
@Test
public void testOnClosed() {
- mockNode();
+ final PeerId peer = mockNode();
final AppendEntriesRequestProcessor processor =
(AppendEntriesRequestProcessor) newProcessor();
PeerPair pair = processor.pairOf(this.peerIdStr, this.serverId);
- final PeerRequestContext ctx =
processor.getOrCreatePeerRequestContext(this.groupId, pair, nodeManager);
+ final PeerRequestContext ctx =
processor.getOrCreatePeerRequestContext(this.groupId, pair,
nodeManager.get(this.groupId, peer));
assertNotNull(ctx);
assertSame(ctx, processor.getPeerRequestContext(this.groupId, pair));
- assertSame(ctx, processor.getOrCreatePeerRequestContext(this.groupId,
pair, nodeManager));
+ assertSame(ctx, processor.getOrCreatePeerRequestContext(this.groupId,
pair, nodeManager.get(this.groupId, peer)));
processor.onClosed(peerIdStr, this.serverId);
assertNull(processor.getPeerRequestContext(this.groupId, pair));
- assertNotSame(ctx,
processor.getOrCreatePeerRequestContext(this.groupId, pair, nodeManager));
+ assertNotSame(ctx,
processor.getOrCreatePeerRequestContext(this.groupId, pair,
nodeManager.get(this.groupId, peer)));
}
@Override
@@ -120,19 +120,20 @@ public class AppendEntriesRequestProcessorTest extends
BaseNodeRequestProcessorT
Mockito.verify(service).handleAppendEntriesRequest(eq(this.request),
Mockito.any());
final PeerPair pair = ((AppendEntriesRequestProcessor)
processor).pairOf(this.peerIdStr, this.serverId);
final PeerRequestContext ctx = ((AppendEntriesRequestProcessor)
processor).getOrCreatePeerRequestContext(
- this.groupId, pair, nodeManager);
+ this.groupId, pair, nodeManager.get(this.groupId, mockNode()));
assertNotNull(ctx);
}
@Test
public void testGetPeerRequestContextRemovePeerRequestContext() {
- mockNode();
+ PeerId peer = mockNode();
final AppendEntriesRequestProcessor processor =
(AppendEntriesRequestProcessor) newProcessor();
final PeerPair pair = processor.pairOf(this.peerIdStr, this.serverId);
- final PeerRequestContext ctx =
processor.getOrCreatePeerRequestContext(this.groupId, pair, nodeManager);
+ final PeerRequestContext ctx =
+ processor.getOrCreatePeerRequestContext(this.groupId, pair,
nodeManager.get(this.groupId, peer));
assertNotNull(ctx);
- assertSame(ctx, processor.getOrCreatePeerRequestContext(this.groupId,
pair, nodeManager));
+ assertSame(ctx, processor.getOrCreatePeerRequestContext(this.groupId,
pair, nodeManager.get(this.groupId, peer)));
assertEquals(0, ctx.getNextRequiredSequence());
assertEquals(0, ctx.getAndIncrementSequence());
assertEquals(1, ctx.getAndIncrementSequence());
@@ -141,7 +142,8 @@ public class AppendEntriesRequestProcessorTest extends
BaseNodeRequestProcessorT
assertFalse(ctx.hasTooManyPendingResponses());
processor.removePeerRequestContext(this.groupId, pair);
- final PeerRequestContext newCtx =
processor.getOrCreatePeerRequestContext(this.groupId, pair, nodeManager);
+ final PeerRequestContext newCtx =
+ processor.getOrCreatePeerRequestContext(this.groupId, pair,
nodeManager.get(this.groupId, peer));
assertNotNull(newCtx);
assertNotSame(ctx, newCtx);
@@ -155,10 +157,10 @@ public class AppendEntriesRequestProcessorTest extends
BaseNodeRequestProcessorT
@Test
public void testSendSequenceResponse() {
- mockNode();
+ final PeerId peer = mockNode();
final AppendEntriesRequestProcessor processor =
(AppendEntriesRequestProcessor) newProcessor();
final PeerPair pair = processor.pairOf(this.peerIdStr, this.serverId);
- processor.getOrCreatePeerRequestContext(this.groupId, pair,
nodeManager);
+ processor.getOrCreatePeerRequestContext(this.groupId, pair,
nodeManager.get(this.groupId, peer));
final PingRequest msg = TestUtils.createPingRequest();
final RpcContext asyncContext = Mockito.mock(RpcContext.class);
processor.sendSequenceResponse(this.groupId, pair, 1, asyncContext,
msg);
@@ -177,14 +179,14 @@ public class AppendEntriesRequestProcessorTest extends
BaseNodeRequestProcessorT
final AppendEntriesRequestProcessor processor =
(AppendEntriesRequestProcessor) newProcessor();
final PeerPair pair = processor.pairOf(this.peerIdStr, this.serverId);
final PingRequest msg = TestUtils.createPingRequest();
- final PeerRequestContext ctx =
processor.getOrCreatePeerRequestContext(this.groupId, pair, nodeManager);
+ final PeerRequestContext ctx =
processor.getOrCreatePeerRequestContext(this.groupId, pair,
nodeManager.get(this.groupId, peer));
assertNotNull(ctx);
processor.sendSequenceResponse(this.groupId, pair, 1, asyncContext,
msg);
processor.sendSequenceResponse(this.groupId, pair, 2, asyncContext,
msg);
processor.sendSequenceResponse(this.groupId, pair, 3, asyncContext,
msg);
Mockito.verify(asyncContext, Mockito.never()).sendResponse(msg);
- final PeerRequestContext newCtx =
processor.getOrCreatePeerRequestContext(this.groupId, pair, nodeManager);
+ final PeerRequestContext newCtx =
processor.getOrCreatePeerRequestContext(this.groupId, pair,
nodeManager.get(this.groupId, peer));
assertNotNull(newCtx);
assertNotSame(ctx, newCtx);
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/BaseNodeRequestProcessorTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/BaseNodeRequestProcessorTest.java
index dbabfeafe6..f6566b3675 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/BaseNodeRequestProcessorTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/BaseNodeRequestProcessorTest.java
@@ -78,7 +78,7 @@ public abstract class BaseNodeRequestProcessorTest<T extends
Message> extends Ba
}
protected PeerId mockNode() {
- Mockito.when(node.getGroupId()).thenReturn(this.groupId);
+ Mockito.lenient().when(node.getGroupId()).thenReturn(this.groupId);
final PeerId peerId = new PeerId();
peerId.parse(this.peerIdStr);
Mockito.when(node.getNodeId()).thenReturn(new NodeId(groupId, peerId));