This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 09816a883f IGNITE-22261 Fix deadlock on configuration application in NodeImpl when disruptors are full (#4066) 09816a883f is described below commit 09816a883f4320c1631fa24188ac4fcffb3e2268 Author: Vladislav Pyatkov <vldpyat...@gmail.com> AuthorDate: Mon Aug 5 13:31:17 2024 +0300 IGNITE-22261 Fix deadlock on configuration application in NodeImpl when disruptors are full (#4066) --- modules/raft/build.gradle | 1 + .../apache/ignite/raft/jraft/core/ItNodeTest.java | 102 ++++++++++++++++++++- .../apache/ignite/raft/jraft/core/BallotBox.java | 10 +- .../apache/ignite/raft/jraft/core/NodeImpl.java | 52 +++++++++-- .../ignite/raft/jraft/option/BallotBoxOptions.java | 9 ++ .../raft/jraft/storage/SnapshotExecutor.java | 6 ++ .../storage/snapshot/SnapshotExecutorImpl.java | 5 +- .../ignite/raft/jraft/core/BallotBoxTest.java | 67 ++++++++------ 8 files changed, 202 insertions(+), 50 deletions(-) diff --git a/modules/raft/build.gradle b/modules/raft/build.gradle index 1d99e218f3..8f3c2fcbad 100644 --- a/modules/raft/build.gradle +++ b/modules/raft/build.gradle @@ -98,4 +98,5 @@ dependencies { integrationTestImplementation project(':ignite-failure-handler') integrationTestImplementation libs.jetbrains.annotations integrationTestImplementation libs.dropwizard.metrics + integrationTestImplementation libs.disruptor } diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java index 97902de1ec..1189b04328 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java @@ -45,6 +45,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.codahale.metrics.ConsoleReporter; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.RingBuffer; import java.io.File; import java.nio.ByteBuffer; import java.nio.file.Files; @@ -68,6 +70,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.BiPredicate; import java.util.function.BooleanSupplier; import java.util.stream.IntStream; @@ -98,7 +101,10 @@ import org.apache.ignite.raft.jraft.closure.ReadIndexClosure; import org.apache.ignite.raft.jraft.closure.SynchronizedClosure; import org.apache.ignite.raft.jraft.closure.TaskClosure; import org.apache.ignite.raft.jraft.conf.Configuration; +import org.apache.ignite.raft.jraft.core.FSMCallerImpl.ApplyTask; +import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor; import org.apache.ignite.raft.jraft.entity.EnumOutter; +import org.apache.ignite.raft.jraft.entity.NodeId; import org.apache.ignite.raft.jraft.entity.PeerId; import org.apache.ignite.raft.jraft.entity.Task; import org.apache.ignite.raft.jraft.entity.UserLog; @@ -257,7 +263,101 @@ public class ItNodeTest extends BaseIgniteAbstractTest { } @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-22261") + public void testSmallestBufferSize() throws Exception { + TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT); + + NodeOptions nodeOptions = createNodeOptions(0); + RaftOptions raftOptions = new RaftOptions(); + raftOptions.setDisruptorBufferSize(1); + nodeOptions.setRaftOptions(raftOptions); + MockStateMachine fsm = new MockStateMachine(peer.getPeerId()); + nodeOptions.setFsm(fsm); + nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta"); + nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); + nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer.getPeerId()))); + + AtomicBoolean block = new AtomicBoolean(); + CountDownLatch latch = new CountDownLatch(1); + + nodeOptions.setfSMCallerExecutorDisruptor(new StripedDisruptor<>( + "unit-test", + "JRaft-FSMCaller-Disruptor", + 1, + () -> new ApplyTask(), + 1, + false, + false, + null + ) { + @Override + public RingBuffer<ApplyTask> subscribe( + NodeId group, + EventHandler<ApplyTask> handler, + BiConsumer<ApplyTask, Throwable> exceptionHandler + ) { + return super.subscribe(group, (event, sequence, endOfBatch) -> { + if (block.compareAndSet(true, false)) { + log.info("Raft task is blocked."); + + latch.await(); + + log.info("Raft task is continue executing."); + } + + handler.onEvent(event, sequence, endOfBatch); + }, exceptionHandler); + } + }); + + RaftGroupService service = createService("unittest", peer, nodeOptions, List.of()); + + Node node = service.start(); + + assertEquals(1, node.listPeers().size()); + assertTrue(node.listPeers().contains(peer.getPeerId())); + assertTrue(waitForCondition(node::isLeader, 10_000)); + + AtomicInteger c = new AtomicInteger(); + + Task task1 = new Task(ByteBuffer.wrap(("Test task").getBytes(UTF_8)), new JoinableClosure(status -> { + log.info("First task is started."); + + if (!status.isOk()) { + assertTrue( + status.getRaftError() == RaftError.EBUSY || status.getRaftError() == RaftError.EPERM); + } + c.incrementAndGet(); + })); + + Task task2 = new Task(ByteBuffer.wrap(("Test task").getBytes(UTF_8)), new JoinableClosure(status -> { + log.info("Second task is started."); + + if (!status.isOk()) { + assertTrue( + status.getRaftError() == RaftError.EBUSY || status.getRaftError() == RaftError.EPERM); + } + c.incrementAndGet(); + })); + + try { + block.set(true); + + node.apply(task1); + + assertTrue(waitForCondition(() -> !block.get(), 10_000)); + + node.apply(task2); + + latch.countDown(); + + Task.joinAll(List.of(task1, task2), TimeUnit.SECONDS.toMillis(30)); + assertEquals(2, c.get()); + } finally { + latch.countDown(); + } + } + + @Test public void testNodeTaskOverload() throws Exception { TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java index 0c5d809e00..ce4ce3a3ea 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java @@ -79,6 +79,7 @@ public class BallotBox implements Lifecycle<BallotBoxOptions>, Describer { } this.waiter = opts.getWaiter(); this.closureQueue = opts.getClosureQueue(); + this.lastCommittedIndex = opts.getLastCommittedIndex(); return true; } @@ -159,10 +160,9 @@ public class BallotBox implements Lifecycle<BallotBoxOptions>, Describer { * |newPendingIndex| should be |last_log_index| + 1. * * @param newPendingIndex pending index of new leader - * @param quorum quorum size * @return returns true if reset success */ - public boolean resetPendingIndex(final long newPendingIndex, final int quorum) { + public boolean resetPendingIndex(final long newPendingIndex) { final long stamp = this.stampedLock.writeLock(); try { if (!(this.pendingIndex == 0 && this.pendingMetaQueue.isEmpty())) { @@ -176,12 +176,6 @@ public class BallotBox implements Lifecycle<BallotBoxOptions>, Describer { return false; } this.pendingIndex = newPendingIndex; - if (quorum == 1) { - // It is safe to initiate lastCommittedIndex as last log one because in case of single peer no one will discard - // log records on leader election. It's not an optimisation, but a matter of correctness because otherwise there will be - // a race between readIndex evaluation and asynchronous log records application on node restart. - this.lastCommittedIndex = newPendingIndex - 1; - } this.closureQueue.resetFirstIndex(newPendingIndex); return true; } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java index c4b498cee1..0fcf832981 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java @@ -1022,14 +1022,6 @@ public class NodeImpl implements Node, RaftServerService { LOG.error("Node {} initFSMCaller failed.", getNodeId()); return false; } - this.ballotBox = new BallotBox(); - final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions(); - ballotBoxOpts.setWaiter(this.fsmCaller); - ballotBoxOpts.setClosureQueue(this.closureQueue); - if (!this.ballotBox.init(ballotBoxOpts)) { - LOG.error("Node {} init ballotBox failed.", getNodeId()); - return false; - } if (!initSnapshotStorage()) { LOG.error("Node {} initSnapshotStorage failed.", getNodeId()); @@ -1053,6 +1045,12 @@ public class NodeImpl implements Node, RaftServerService { this.targetPriority = getMaxPriorityOfNodes(this.conf.getConf().getPeers()); } + // It must be initialized after initializing conf and log storage. + if (!initBallotBox()) { + LOG.error("Node {} init ballotBox failed.", getNodeId()); + return false; + } + if (!this.conf.isEmpty()) { Requires.requireTrue(this.conf.isValid(), "Invalid conf: %s", this.conf); } @@ -1127,6 +1125,30 @@ public class NodeImpl implements Node, RaftServerService { return true; } + private boolean initBallotBox() { + this.ballotBox = new BallotBox(); + final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions(); + ballotBoxOpts.setWaiter(this.fsmCaller); + ballotBoxOpts.setClosureQueue(this.closureQueue); + // TODO: uncomment when backport related change https://issues.apache.org/jira/browse/IGNITE-22923 + //ballotBoxOpts.setNodeId(getNodeId()); + // Try to initialize the last committed index in BallotBox to be the last snapshot index. + long lastCommittedIndex = 0; + if (this.snapshotExecutor != null) { + lastCommittedIndex = this.snapshotExecutor.getLastSnapshotIndex(); + } + if (this.getQuorum() == 1) { + // It is safe to initiate lastCommittedIndex as last log one because in case of single peer no one will discard + // log records on leader election. + // Fix https://github.com/sofastack/sofa-jraft/issues/1049 + lastCommittedIndex = Math.max(lastCommittedIndex, this.logManager.getLastLogIndex()); + } + + ballotBoxOpts.setLastCommittedIndex(lastCommittedIndex); + LOG.info("Node {} init ballot box's lastCommittedIndex={}.", getNodeId(), lastCommittedIndex); + return this.ballotBox.init(ballotBoxOpts); + } + /** * Validates a required option if shared pools are enabled. * @@ -1451,7 +1473,7 @@ public class NodeImpl implements Node, RaftServerService { } // init commit manager - this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1, getQuorum()); + this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1); // Register _conf_ctx to reject configuration changing before the first log // is committed. if (this.confCtx.isBusy()) { @@ -1561,6 +1583,18 @@ public class NodeImpl implements Node, RaftServerService { } private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) { + if (!this.logManager.hasAvailableCapacityToAppendEntries(1)) { + // It's overload, fail-fast + final List<Closure> dones = tasks.stream().map(ele -> ele.done).filter(Objects::nonNull) + .collect(Collectors.toList()); + Utils.runInThread(this.getOptions().getCommonExecutor(), () -> { + for (final Closure done : dones) { + done.run(new Status(RaftError.EBUSY, "Node %s log manager is busy.", this.getNodeId())); + } + }); + return; + } + this.writeLock.lock(); try { final int size = tasks.size(); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/BallotBoxOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/BallotBoxOptions.java index eea31def7d..02018fb74f 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/BallotBoxOptions.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/BallotBoxOptions.java @@ -26,6 +26,15 @@ public class BallotBoxOptions { private FSMCaller waiter; private ClosureQueue closureQueue; + private long lastCommittedIndex; + + public long getLastCommittedIndex() { + return lastCommittedIndex; + } + + public void setLastCommittedIndex(long lastCommittedIndex) { + this.lastCommittedIndex = lastCommittedIndex; + } public FSMCaller getWaiter() { return this.waiter; diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutor.java index d7688083d6..2040e85864 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutor.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutor.java @@ -42,6 +42,12 @@ public interface SnapshotExecutor extends Lifecycle<SnapshotExecutorOptions>, De */ void doSnapshot(final Closure done); + /** + * Returns the last snapshot index. + * @return + */ + long getLastSnapshotIndex(); + /** * Install snapshot according to the very RPC from leader After the installing succeeds (StateMachine is reset with * the snapshot) or fails, done will be called to respond Errors: - Term mismatches: which happens diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java index b62a425cd3..f0db519aac 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java @@ -100,10 +100,7 @@ public class SnapshotExecutorImpl implements SnapshotExecutor { return this.lastSnapshotTerm; } - /** - * Only for test - */ - @OnlyForTest + @Override public long getLastSnapshotIndex() { return this.lastSnapshotIndex; } diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/BallotBoxTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/BallotBoxTest.java index f26f18181c..30dcfb76d5 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/BallotBoxTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/BallotBoxTest.java @@ -16,6 +16,12 @@ */ package org.apache.ignite.raft.jraft.core; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + import java.util.concurrent.ExecutorService; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.raft.jraft.Closure; @@ -32,18 +38,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; - @ExtendWith(MockitoExtension.class) public class BallotBoxTest extends BaseIgniteAbstractTest { private BallotBox box; @@ -61,6 +59,7 @@ public class BallotBoxTest extends BaseIgniteAbstractTest { this.closureQueue = new ClosureQueueImpl(options); opts.setClosureQueue(this.closureQueue); opts.setWaiter(this.waiter); + opts.setLastCommittedIndex(0); box = new BallotBox(); assertTrue(box.init(opts)); } @@ -71,19 +70,34 @@ public class BallotBoxTest extends BaseIgniteAbstractTest { ExecutorServiceHelper.shutdownAndAwaitTermination(executor); } - @ParameterizedTest - @ValueSource(shorts = {0, 1, 3}) - public void testResetPendingIndex(int quorum) { + @Test + public void initWithLastCommittedIndex() { + BallotBoxOptions opts = new BallotBoxOptions(); + NodeOptions options = new NodeOptions(); + executor = JRaftUtils.createExecutor("test-executor-", Utils.cpus()); + options.setCommonExecutor(executor); + this.closureQueue = new ClosureQueueImpl(options); + opts.setClosureQueue(this.closureQueue); + opts.setWaiter(this.waiter); + opts.setLastCommittedIndex(9); + box = new BallotBox(); + assertTrue(box.init(opts)); + + assertEquals(box.getLastCommittedIndex(), 9); + } + + @Test + public void testResetPendingIndex() { assertEquals(0, closureQueue.getFirstIndex()); assertEquals(0, box.getPendingIndex()); - assertTrue(box.resetPendingIndex(1, quorum)); + assertTrue(box.resetPendingIndex(1)); + assertEquals(0, box.getLastCommittedIndex()); assertEquals(1, closureQueue.getFirstIndex()); assertEquals(1, box.getPendingIndex()); } - @ParameterizedTest - @ValueSource(shorts = {0, 1, 3}) - public void testAppendPendingTask(int quorum) { + @Test + public void testAppendPendingTask() { assertTrue(this.box.getPendingMetaQueue().isEmpty()); assertTrue(this.closureQueue.getQueue().isEmpty()); assertFalse(this.box.appendPendingTask( @@ -95,7 +109,7 @@ public class BallotBoxTest extends BaseIgniteAbstractTest { } })); - assertTrue(box.resetPendingIndex(1, quorum)); + assertTrue(box.resetPendingIndex(1)); assertTrue(this.box.appendPendingTask( JRaftUtils.getConfiguration("localhost:8081,localhost:8082,localhost:8083"), JRaftUtils.getConfiguration("localhost:8081"), new Closure() { @@ -110,21 +124,19 @@ public class BallotBoxTest extends BaseIgniteAbstractTest { assertEquals(1, this.closureQueue.getQueue().size()); } - @ParameterizedTest - @ValueSource(shorts = {0, 1, 3}) - public void testClearPendingTasks(int quorum) { - testAppendPendingTask(quorum); + @Test + public void testClearPendingTasks() { + testAppendPendingTask(); this.box.clearPendingTasks(); assertTrue(this.box.getPendingMetaQueue().isEmpty()); assertTrue(this.closureQueue.getQueue().isEmpty()); assertEquals(0, closureQueue.getFirstIndex()); } - @ParameterizedTest - @ValueSource(shorts = {0, 1, 3}) - public void testCommitAt(int quorum) { + @Test + public void testCommitAt() { assertFalse(this.box.commitAt(1, 3, new PeerId("localhost", 8081))); - assertTrue(box.resetPendingIndex(1, quorum)); + assertTrue(box.resetPendingIndex(1)); assertTrue(this.box.appendPendingTask( JRaftUtils.getConfiguration("localhost:8081,localhost:8082,localhost:8083"), JRaftUtils.getConfiguration("localhost:8081"), new Closure() { @@ -150,10 +162,9 @@ public class BallotBoxTest extends BaseIgniteAbstractTest { Mockito.verify(this.waiter, Mockito.only()).onCommitted(1); } - @ParameterizedTest - @ValueSource(shorts = {0, 1, 3}) - public void testSetLastCommittedIndexHasPending(int quorum) { - assertTrue(box.resetPendingIndex(1, quorum)); + @Test + public void testSetLastCommittedIndexHasPending() { + assertTrue(box.resetPendingIndex(1)); assertThrows(IllegalArgumentException.class, () -> this.box.setLastCommittedIndex(1)); }