This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 944541623ac IGNITE-25502 Fail node on index greater than current
majority (#7694)
944541623ac is described below
commit 944541623ac778f6df57938a83386bdc186ece3d
Author: Viacheslav Blinov <[email protected]>
AuthorDate: Fri Apr 10 12:15:44 2026 +0300
IGNITE-25502 Fail node on index greater than current majority (#7694)
---
.../replicator/raft/ZonePartitionRaftListener.java | 14 +++++++
.../internal/raft/service/RaftGroupListener.java | 10 +++++
.../internal/raft/server/impl/JraftServerImpl.java | 5 +++
.../org/apache/ignite/raft/jraft/StateMachine.java | 11 +++++
.../apache/ignite/raft/jraft/core/NodeImpl.java | 20 +++++++++
.../raft/jraft/storage/impl/LogManagerImpl.java | 29 ++++++++++---
.../raft/jraft/storage/impl/LogManagerTest.java | 48 ++++++++++++++++++++++
.../ItTruncateRaftLogAndRestartNodesTest.java | 16 ++++++--
8 files changed, 145 insertions(+), 8 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index 3011de42ad7..d2c480b357f 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -448,6 +448,20 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
}
}
+ @Override
+ public long getPersistedAppliedIndex() {
+ // Clamp to 0 because lastAppliedIndex() returns -1 during rebalance.
+ long result = max(0, txStateStorage.lastAppliedIndex());
+
+ synchronized (tableProcessorsStateLock) {
+ for (RaftTableProcessor processor : tableProcessors.values()) {
+ result = max(result, processor.lastAppliedIndex());
+ }
+ }
+
+ return result;
+ }
+
/**
* Adds a given Table Partition-level Raft processor to the set of managed
processors.
*
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
index bf2697e9b2d..076f753ccb2 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
@@ -96,4 +96,14 @@ public interface RaftGroupListener {
* Invoked once after a raft node has been shut down.
*/
void onShutdown();
+
+ /**
+ * Returns the last applied index persisted by the state machine.
+ * Called during Raft node initialization to prevent truncation of
already-applied log entries.
+ *
+ * @return persisted applied index, or 0 if unknown.
+ */
+ default long getPersistedAppliedIndex() {
+ return 0;
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 8221a3e31a3..b20bf1508f5 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -873,6 +873,11 @@ public class JraftServerImpl implements RaftServer {
return listener;
}
+ @Override
+ public long getPersistedAppliedIndex() {
+ return listener.getPersistedAppliedIndex();
+ }
+
@Override
public void onApply(Iterator iter) {
var iterWrapper = new WriteCommandIterator(iter, marshaller);
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/StateMachine.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/StateMachine.java
index 3519c99520d..d393b5242d9 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/StateMachine.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/StateMachine.java
@@ -18,6 +18,7 @@ package org.apache.ignite.raft.jraft;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.entity.LeaderChangeContext;
import org.apache.ignite.raft.jraft.error.RaftException;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
@@ -133,4 +134,14 @@ public interface StateMachine {
* @param ctx context of leader change
*/
void onStartFollowing(final LeaderChangeContext ctx);
+
+ /**
+ * Returns the last applied index persisted by the state machine.
+ * Called during {@link NodeImpl#init()} to prevent truncation of
already-applied log entries.
+ *
+ * @return persisted applied index, or 0 if unknown.
+ */
+ default long getPersistedAppliedIndex() {
+ return 0;
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 60aa117772c..ced1be2bfc3 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -1071,6 +1071,26 @@ public class NodeImpl implements Node, RaftServerService
{
return false;
}
+ /*
+ * Restore appliedId so that unsafeTruncateSuffix() can reject
truncation of applied entries.
+ *
+ * After a node restart, logManager.appliedId is transient and resets
to 0.
+ * This block restores appliedId from the state machine's persisted
applied index
+ * so the unsafeTruncateSuffix() guard is effective immediately after
restart, before any entries are re-applied.
+ */
+ long persistedApplied =
this.options.getFsm().getPersistedAppliedIndex();
+ if (persistedApplied > 0) {
+ long term = this.logManager.getTerm(persistedApplied);
+ if (term > 0) {
+ this.logManager.setAppliedId(new LogId(persistedApplied,
term));
+ } else {
+ // Term is 0 when the index is outside the log (covered by a
snapshot) — skip in that case.
+ LOG.warn("Persisted applied index is not in the raft log,
expecting snapshot to cover it "
+ + "[nodeId={}, persistedAppliedIndex={}]",
+ getNodeId(), persistedApplied);
+ }
+ }
+
final Status st = this.logManager.checkConsistency();
if (!st.isOk()) {
LOG.error("Node {} is initialized with inconsistent log,
status={}.", getNodeId(), st);
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
index f486645ac23..83584f7fee2 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
@@ -1050,11 +1050,25 @@ public class LogManagerImpl implements LogManager {
}
}
- private void unsafeTruncateSuffix(final long lastIndexKept, final Lock
lock) {
+ /**
+ * Truncates log entries after {@code lastIndexKept}.
+ *
+ * @return {@code true} on success, {@code false} if truncation would
discard applied entries (node moves to error state).
+ */
+ private boolean unsafeTruncateSuffix(final long lastIndexKept, final Lock
lock) {
if (lastIndexKept < this.appliedId.getIndex()) {
- LOG.error("FATAL ERROR: Can't truncate logs before appliedId={},
lastIndexKept={}", this.appliedId,
- lastIndexKept);
- return;
+ LOG.error("Raft log suffix conflict: cannot truncate entries that
have been applied to the state machine. "
+ + "The partition will be moved to error state [nodeId={},
appliedId={}, lastIndexKept={}]",
+ this.nodeId, this.appliedId, lastIndexKept);
+ lock.unlock();
+ try {
+ reportError(RaftError.EINVAL.getNumber(),
+ "Raft log suffix conflict: attempted to truncate
applied entries, appliedId=%s, lastIndexKept=%d",
+ this.appliedId, lastIndexKept);
+ } finally {
+ lock.lock();
+ }
+ return false;
}
this.logsInMemory.removeFromLastWhen(entry -> entry.getId().getIndex()
> lastIndexKept);
@@ -1069,6 +1083,7 @@ public class LogManagerImpl implements LogManager {
final TruncateSuffixClosure c = new
TruncateSuffixClosure(lastIndexKept, lastTermKept);
offerEvent(c, EventType.TRUNCATE_SUFFIX);
lock.lock();
+ return true;
}
@SuppressWarnings("NonAtomicOperationOnVolatileField")
@@ -1122,7 +1137,11 @@ public class LogManagerImpl implements LogManager {
if (entries.get(conflictingIndex).getId().getIndex() <=
this.lastLogIndex) {
// Truncate all the conflicting entries to make local
logs
// consensus with the leader.
-
unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1,
lock);
+ if
(!unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1,
lock)) {
+
Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done,
+ new Status(RaftError.EINVAL, "Raft log suffix
conflict with applied entries"));
+ return false;
+ }
}
this.lastLogIndex = lastLogEntry.getId().getIndex();
} // else this is a duplicated AppendEntriesRequest, we have
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
index 97d9aed4459..ee0001bd224 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
@@ -23,9 +23,14 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.verify;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import org.apache.ignite.internal.metrics.TestMetricManager;
@@ -40,6 +45,7 @@ import
org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.error.RaftException;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.RaftOutter;
@@ -477,4 +483,46 @@ public class LogManagerTest extends BaseStorageTest {
Exception e = assertThrows(IllegalStateException.class, () ->
this.logManager.getLastLogIndex(true));
assertEquals("Node is shutting down", e.getMessage());
}
+
+ /** Suffix truncation below appliedId must report error and abort the
append. */
+ @Test
+ public void testSuffixTruncationBelowAppliedIndexReportsError() {
+ List<LogEntry> entries = new ArrayList<>();
+ for (int i = 1; i <= 10; i++) {
+ entries.add(TestUtils.mockEntry(i, 1));
+ }
+ assertThat("Initial entries should be appended successfully",
appendEntries(entries), willBe(true));
+ assertEquals(10, this.logManager.getLastLogIndex());
+
+ this.logManager.getLastLogId(true); // Flush disruptor so setDiskId()
completes.
+ this.logManager.setAppliedId(new LogId(8, 1));
+
+ // Conflicting entries at index 6+ (term 2 vs existing term 1) trigger
unsafeTruncateSuffix(5).
+ // Since 5 < appliedId.index (8), this must be rejected.
+ List<LogEntry> conflicting = new ArrayList<>();
+ for (int i = 6; i <= 12; i++) {
+ conflicting.add(TestUtils.mockEntry(i, 2));
+ }
+ assertThat("Append should fail due to suffix conflict with applied
entries", appendEntries(conflicting), willBe(false));
+
+ verify(fsmCaller).onError(any(RaftException.class));
+ assertEquals(10, this.logManager.getLastLogIndex());
+
+ for (int i = 1; i <= 10; i++) {
+ LogEntry entry = this.logManager.getEntry(i);
+ assertEquals(i, entry.getId().getIndex());
+ assertEquals(1, entry.getId().getTerm());
+ }
+ }
+
+ private CompletableFuture<Boolean> appendEntries(List<LogEntry> entries) {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ this.logManager.appendEntries(new ArrayList<>(entries), new
LogManager.StableClosure() {
+ @Override
+ public void run(Status status) {
+ future.complete(status.isOk());
+ }
+ });
+ return future;
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
index cfa64a01cf7..83e14a4db01 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
@@ -23,8 +23,10 @@ import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_
import static
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.arrayWithSize;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
@@ -72,13 +74,13 @@ import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.State;
import org.apache.ignite.raft.jraft.option.LogStorageOptions;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.LogStorage;
import org.apache.ignite.tx.TransactionOptions;
import org.hamcrest.Matchers;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -98,7 +100,6 @@ public class ItTruncateRaftLogAndRestartNodesTest extends
ClusterPerTestIntegrat
return 0;
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-25502")
@Test
void enterNodeWithIndexGreaterThanCurrentMajority() throws Exception {
cluster.startAndInit(3);
@@ -148,12 +149,21 @@ public class ItTruncateRaftLogAndRestartNodesTest extends
ClusterPerTestIntegrat
startNode(2);
+ // Node 2 has applied entries that the new majority (nodes 0, 1)
doesn't have,
+ // so it must go to ERROR state when the leader tries to overwrite
those entries.
+ await()
+ .timeout(10, TimeUnit.SECONDS)
+ .untilAsserted(() ->
+ assertThat(raftNodeImpl(2,
replicationGroup).getState(), equalTo(State.STATE_ERROR))
+ );
+
+ // SQL should still work via the healthy majority.
assertThat(
toPeopleFromSqlRows(executeSql(selectPeopleDml(TABLE_NAME))),
arrayWithSize(Matchers.allOf(greaterThan(0),
lessThan(people.length)))
);
- for (int nodeIndex = 0; nodeIndex < 3; nodeIndex++) {
+ for (int nodeIndex = 0; nodeIndex < 2; nodeIndex++) {
assertThat(
"nodeIndex=" + nodeIndex,
scanPeopleFromAllPartitions(nodeIndex, TABLE_NAME),