This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 09816a883f IGNITE-22261 Fix deadlock on configuration application in 
NodeImpl when disruptors are full (#4066)
09816a883f is described below

commit 09816a883f4320c1631fa24188ac4fcffb3e2268
Author: Vladislav Pyatkov <vldpyat...@gmail.com>
AuthorDate: Mon Aug 5 13:31:17 2024 +0300

    IGNITE-22261 Fix deadlock on configuration application in NodeImpl when 
disruptors are full (#4066)
---
 modules/raft/build.gradle                          |   1 +
 .../apache/ignite/raft/jraft/core/ItNodeTest.java  | 102 ++++++++++++++++++++-
 .../apache/ignite/raft/jraft/core/BallotBox.java   |  10 +-
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |  52 +++++++++--
 .../ignite/raft/jraft/option/BallotBoxOptions.java |   9 ++
 .../raft/jraft/storage/SnapshotExecutor.java       |   6 ++
 .../storage/snapshot/SnapshotExecutorImpl.java     |   5 +-
 .../ignite/raft/jraft/core/BallotBoxTest.java      |  67 ++++++++------
 8 files changed, 202 insertions(+), 50 deletions(-)

diff --git a/modules/raft/build.gradle b/modules/raft/build.gradle
index 1d99e218f3..8f3c2fcbad 100644
--- a/modules/raft/build.gradle
+++ b/modules/raft/build.gradle
@@ -98,4 +98,5 @@ dependencies {
     integrationTestImplementation project(':ignite-failure-handler')
     integrationTestImplementation libs.jetbrains.annotations
     integrationTestImplementation libs.dropwizard.metrics
+    integrationTestImplementation libs.disruptor
 }
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 97902de1ec..1189b04328 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -45,6 +45,8 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import com.codahale.metrics.ConsoleReporter;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.RingBuffer;
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
@@ -68,6 +70,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 import java.util.function.BiPredicate;
 import java.util.function.BooleanSupplier;
 import java.util.stream.IntStream;
@@ -98,7 +101,10 @@ import 
org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
 import org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
 import org.apache.ignite.raft.jraft.closure.TaskClosure;
 import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.core.FSMCallerImpl.ApplyTask;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.EnumOutter;
+import org.apache.ignite.raft.jraft.entity.NodeId;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.entity.Task;
 import org.apache.ignite.raft.jraft.entity.UserLog;
@@ -257,7 +263,101 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-22261";)
+    public void testSmallestBufferSize() throws Exception {
+        TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT);
+
+        NodeOptions nodeOptions = createNodeOptions(0);
+        RaftOptions raftOptions = new RaftOptions();
+        raftOptions.setDisruptorBufferSize(1);
+        nodeOptions.setRaftOptions(raftOptions);
+        MockStateMachine fsm = new MockStateMachine(peer.getPeerId());
+        nodeOptions.setFsm(fsm);
+        nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
+        nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
+        nodeOptions.setInitialConf(new 
Configuration(Collections.singletonList(peer.getPeerId())));
+
+        AtomicBoolean block = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        nodeOptions.setfSMCallerExecutorDisruptor(new StripedDisruptor<>(
+                "unit-test",
+                "JRaft-FSMCaller-Disruptor",
+                1,
+                () -> new ApplyTask(),
+                1,
+                false,
+                false,
+                null
+        ) {
+            @Override
+            public RingBuffer<ApplyTask> subscribe(
+                    NodeId group,
+                    EventHandler<ApplyTask> handler,
+                    BiConsumer<ApplyTask, Throwable> exceptionHandler
+            ) {
+                return super.subscribe(group, (event, sequence, endOfBatch) -> 
{
+                    if (block.compareAndSet(true, false)) {
+                        log.info("Raft task is blocked.");
+
+                        latch.await();
+
+                        log.info("Raft task is continue executing.");
+                    }
+
+                    handler.onEvent(event, sequence, endOfBatch);
+                }, exceptionHandler);
+            }
+        });
+
+        RaftGroupService service = createService("unittest", peer, 
nodeOptions, List.of());
+
+        Node node = service.start();
+
+        assertEquals(1, node.listPeers().size());
+        assertTrue(node.listPeers().contains(peer.getPeerId()));
+        assertTrue(waitForCondition(node::isLeader, 10_000));
+
+        AtomicInteger c = new AtomicInteger();
+
+        Task task1 = new Task(ByteBuffer.wrap(("Test task").getBytes(UTF_8)), 
new JoinableClosure(status -> {
+            log.info("First task is started.");
+
+            if (!status.isOk()) {
+                assertTrue(
+                        status.getRaftError() == RaftError.EBUSY || 
status.getRaftError() == RaftError.EPERM);
+            }
+            c.incrementAndGet();
+        }));
+
+        Task task2 = new Task(ByteBuffer.wrap(("Test task").getBytes(UTF_8)), 
new JoinableClosure(status -> {
+            log.info("Second task is started.");
+
+            if (!status.isOk()) {
+                assertTrue(
+                        status.getRaftError() == RaftError.EBUSY || 
status.getRaftError() == RaftError.EPERM);
+            }
+            c.incrementAndGet();
+        }));
+
+        try {
+            block.set(true);
+
+            node.apply(task1);
+
+            assertTrue(waitForCondition(() -> !block.get(), 10_000));
+
+            node.apply(task2);
+
+            latch.countDown();
+
+            Task.joinAll(List.of(task1, task2), TimeUnit.SECONDS.toMillis(30));
+            assertEquals(2, c.get());
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Test
     public void testNodeTaskOverload() throws Exception {
         TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT);
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java
index 0c5d809e00..ce4ce3a3ea 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java
@@ -79,6 +79,7 @@ public class BallotBox implements 
Lifecycle<BallotBoxOptions>, Describer {
         }
         this.waiter = opts.getWaiter();
         this.closureQueue = opts.getClosureQueue();
+        this.lastCommittedIndex = opts.getLastCommittedIndex();
         return true;
     }
 
@@ -159,10 +160,9 @@ public class BallotBox implements 
Lifecycle<BallotBoxOptions>, Describer {
      * |newPendingIndex| should be |last_log_index| + 1.
      *
      * @param newPendingIndex pending index of new leader
-     * @param quorum quorum size
      * @return returns true if reset success
      */
-    public boolean resetPendingIndex(final long newPendingIndex, final int 
quorum) {
+    public boolean resetPendingIndex(final long newPendingIndex) {
         final long stamp = this.stampedLock.writeLock();
         try {
             if (!(this.pendingIndex == 0 && this.pendingMetaQueue.isEmpty())) {
@@ -176,12 +176,6 @@ public class BallotBox implements 
Lifecycle<BallotBoxOptions>, Describer {
                 return false;
             }
             this.pendingIndex = newPendingIndex;
-            if (quorum == 1) {
-                // It is safe to initiate lastCommittedIndex as last log one 
because in case of single peer no one will discard
-                // log records on leader election. It's not an optimisation, 
but a matter of correctness because otherwise there will be
-                // a race between readIndex evaluation and asynchronous log 
records application on node restart.
-                this.lastCommittedIndex = newPendingIndex - 1;
-            }
             this.closureQueue.resetFirstIndex(newPendingIndex);
             return true;
         }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index c4b498cee1..0fcf832981 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -1022,14 +1022,6 @@ public class NodeImpl implements Node, RaftServerService 
{
             LOG.error("Node {} initFSMCaller failed.", getNodeId());
             return false;
         }
-        this.ballotBox = new BallotBox();
-        final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
-        ballotBoxOpts.setWaiter(this.fsmCaller);
-        ballotBoxOpts.setClosureQueue(this.closureQueue);
-        if (!this.ballotBox.init(ballotBoxOpts)) {
-            LOG.error("Node {} init ballotBox failed.", getNodeId());
-            return false;
-        }
 
         if (!initSnapshotStorage()) {
             LOG.error("Node {} initSnapshotStorage failed.", getNodeId());
@@ -1053,6 +1045,12 @@ public class NodeImpl implements Node, RaftServerService 
{
             this.targetPriority = 
getMaxPriorityOfNodes(this.conf.getConf().getPeers());
         }
 
+        // It must be initialized after initializing conf and log storage.
+        if (!initBallotBox()) {
+            LOG.error("Node {} init ballotBox failed.", getNodeId());
+            return false;
+        }
+
         if (!this.conf.isEmpty()) {
             Requires.requireTrue(this.conf.isValid(), "Invalid conf: %s", 
this.conf);
         }
@@ -1127,6 +1125,30 @@ public class NodeImpl implements Node, RaftServerService 
{
         return true;
     }
 
+    private boolean initBallotBox() {
+        this.ballotBox = new BallotBox();
+        final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
+        ballotBoxOpts.setWaiter(this.fsmCaller);
+        ballotBoxOpts.setClosureQueue(this.closureQueue);
+        // TODO: uncomment when backport related change 
https://issues.apache.org/jira/browse/IGNITE-22923
+        //ballotBoxOpts.setNodeId(getNodeId());
+         // Try to initialize the last committed index in BallotBox to be the 
last snapshot index.
+        long lastCommittedIndex = 0;
+        if (this.snapshotExecutor != null) {
+            lastCommittedIndex = this.snapshotExecutor.getLastSnapshotIndex();
+        }
+        if (this.getQuorum() == 1) {
+            // It is safe to initiate lastCommittedIndex as last log one 
because in case of single peer no one will discard
+            // log records on leader election.
+            // Fix https://github.com/sofastack/sofa-jraft/issues/1049
+            lastCommittedIndex = Math.max(lastCommittedIndex, 
this.logManager.getLastLogIndex());
+        }
+
+        ballotBoxOpts.setLastCommittedIndex(lastCommittedIndex);
+        LOG.info("Node {} init ballot box's lastCommittedIndex={}.", 
getNodeId(), lastCommittedIndex);
+        return this.ballotBox.init(ballotBoxOpts);
+    }
+
     /**
      * Validates a required option if shared pools are enabled.
      *
@@ -1451,7 +1473,7 @@ public class NodeImpl implements Node, RaftServerService {
         }
 
         // init commit manager
-        this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 
1, getQuorum());
+        this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 
1);
         // Register _conf_ctx to reject configuration changing before the 
first log
         // is committed.
         if (this.confCtx.isBusy()) {
@@ -1561,6 +1583,18 @@ public class NodeImpl implements Node, RaftServerService 
{
     }
 
     private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
+        if (!this.logManager.hasAvailableCapacityToAppendEntries(1)) {
+               // It's overload, fail-fast
+               final List<Closure> dones = tasks.stream().map(ele -> 
ele.done).filter(Objects::nonNull)
+                               .collect(Collectors.toList());
+               Utils.runInThread(this.getOptions().getCommonExecutor(), () -> {
+                       for (final Closure done : dones) {
+                               done.run(new Status(RaftError.EBUSY, "Node %s 
log manager is busy.", this.getNodeId()));
+                       }
+               });
+               return;
+        }
+
         this.writeLock.lock();
         try {
             final int size = tasks.size();
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/BallotBoxOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/BallotBoxOptions.java
index eea31def7d..02018fb74f 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/BallotBoxOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/BallotBoxOptions.java
@@ -26,6 +26,15 @@ public class BallotBoxOptions {
 
     private FSMCaller waiter;
     private ClosureQueue closureQueue;
+    private long lastCommittedIndex;
+
+    public long getLastCommittedIndex() {
+        return lastCommittedIndex;
+    }
+
+    public void setLastCommittedIndex(long lastCommittedIndex) {
+        this.lastCommittedIndex = lastCommittedIndex;
+    }
 
     public FSMCaller getWaiter() {
         return this.waiter;
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutor.java
index d7688083d6..2040e85864 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutor.java
@@ -42,6 +42,12 @@ public interface SnapshotExecutor extends 
Lifecycle<SnapshotExecutorOptions>, De
      */
     void doSnapshot(final Closure done);
 
+    /**
+     * Returns the last snapshot index.
+     * @return
+     */
+    long getLastSnapshotIndex();
+
     /**
      * Install snapshot according to the very RPC from leader After the 
installing succeeds (StateMachine is reset with
      * the snapshot) or fails, done will be called to respond Errors: - Term 
mismatches: which happens
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
index b62a425cd3..f0db519aac 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
@@ -100,10 +100,7 @@ public class SnapshotExecutorImpl implements 
SnapshotExecutor {
         return this.lastSnapshotTerm;
     }
 
-    /**
-     * Only for test
-     */
-    @OnlyForTest
+    @Override
     public long getLastSnapshotIndex() {
         return this.lastSnapshotIndex;
     }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/BallotBoxTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/BallotBoxTest.java
index f26f18181c..30dcfb76d5 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/BallotBoxTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/BallotBoxTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.ignite.raft.jraft.core;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 import java.util.concurrent.ExecutorService;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.raft.jraft.Closure;
@@ -32,18 +38,10 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-
 @ExtendWith(MockitoExtension.class)
 public class BallotBoxTest extends BaseIgniteAbstractTest {
     private BallotBox box;
@@ -61,6 +59,7 @@ public class BallotBoxTest extends BaseIgniteAbstractTest {
         this.closureQueue = new ClosureQueueImpl(options);
         opts.setClosureQueue(this.closureQueue);
         opts.setWaiter(this.waiter);
+        opts.setLastCommittedIndex(0);
         box = new BallotBox();
         assertTrue(box.init(opts));
     }
@@ -71,19 +70,34 @@ public class BallotBoxTest extends BaseIgniteAbstractTest {
         ExecutorServiceHelper.shutdownAndAwaitTermination(executor);
     }
 
-    @ParameterizedTest
-    @ValueSource(shorts = {0, 1, 3})
-    public void testResetPendingIndex(int quorum) {
+    @Test
+    public void initWithLastCommittedIndex() {
+        BallotBoxOptions opts = new BallotBoxOptions();
+        NodeOptions options = new NodeOptions();
+        executor = JRaftUtils.createExecutor("test-executor-", Utils.cpus());
+        options.setCommonExecutor(executor);
+        this.closureQueue = new ClosureQueueImpl(options);
+        opts.setClosureQueue(this.closureQueue);
+        opts.setWaiter(this.waiter);
+        opts.setLastCommittedIndex(9);
+        box = new BallotBox();
+        assertTrue(box.init(opts));
+
+        assertEquals(box.getLastCommittedIndex(), 9);
+    }
+
+    @Test
+    public void testResetPendingIndex() {
         assertEquals(0, closureQueue.getFirstIndex());
         assertEquals(0, box.getPendingIndex());
-        assertTrue(box.resetPendingIndex(1, quorum));
+        assertTrue(box.resetPendingIndex(1));
+        assertEquals(0, box.getLastCommittedIndex());
         assertEquals(1, closureQueue.getFirstIndex());
         assertEquals(1, box.getPendingIndex());
     }
 
-    @ParameterizedTest
-    @ValueSource(shorts = {0, 1, 3})
-    public void testAppendPendingTask(int quorum) {
+    @Test
+    public void testAppendPendingTask() {
         assertTrue(this.box.getPendingMetaQueue().isEmpty());
         assertTrue(this.closureQueue.getQueue().isEmpty());
         assertFalse(this.box.appendPendingTask(
@@ -95,7 +109,7 @@ public class BallotBoxTest extends BaseIgniteAbstractTest {
 
                 }
             }));
-        assertTrue(box.resetPendingIndex(1, quorum));
+        assertTrue(box.resetPendingIndex(1));
         assertTrue(this.box.appendPendingTask(
             
JRaftUtils.getConfiguration("localhost:8081,localhost:8082,localhost:8083"),
             JRaftUtils.getConfiguration("localhost:8081"), new Closure() {
@@ -110,21 +124,19 @@ public class BallotBoxTest extends BaseIgniteAbstractTest 
{
         assertEquals(1, this.closureQueue.getQueue().size());
     }
 
-    @ParameterizedTest
-    @ValueSource(shorts = {0, 1, 3})
-    public void testClearPendingTasks(int quorum) {
-        testAppendPendingTask(quorum);
+    @Test
+    public void testClearPendingTasks() {
+        testAppendPendingTask();
         this.box.clearPendingTasks();
         assertTrue(this.box.getPendingMetaQueue().isEmpty());
         assertTrue(this.closureQueue.getQueue().isEmpty());
         assertEquals(0, closureQueue.getFirstIndex());
     }
 
-    @ParameterizedTest
-    @ValueSource(shorts = {0, 1, 3})
-    public void testCommitAt(int quorum) {
+    @Test
+    public void testCommitAt() {
         assertFalse(this.box.commitAt(1, 3, new PeerId("localhost", 8081)));
-        assertTrue(box.resetPendingIndex(1, quorum));
+        assertTrue(box.resetPendingIndex(1));
         assertTrue(this.box.appendPendingTask(
             
JRaftUtils.getConfiguration("localhost:8081,localhost:8082,localhost:8083"),
             JRaftUtils.getConfiguration("localhost:8081"), new Closure() {
@@ -150,10 +162,9 @@ public class BallotBoxTest extends BaseIgniteAbstractTest {
         Mockito.verify(this.waiter, Mockito.only()).onCommitted(1);
     }
 
-    @ParameterizedTest
-    @ValueSource(shorts = {0, 1, 3})
-    public void testSetLastCommittedIndexHasPending(int quorum) {
-        assertTrue(box.resetPendingIndex(1, quorum));
+    @Test
+    public void testSetLastCommittedIndexHasPending() {
+        assertTrue(box.resetPendingIndex(1));
         assertThrows(IllegalArgumentException.class, () -> 
this.box.setLastCommittedIndex(1));
     }
 

Reply via email to