This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 944541623ac IGNITE-25502 Fail node on index greater than current 
majority (#7694)
944541623ac is described below

commit 944541623ac778f6df57938a83386bdc186ece3d
Author: Viacheslav Blinov <[email protected]>
AuthorDate: Fri Apr 10 12:15:44 2026 +0300

    IGNITE-25502 Fail node on index greater than current majority (#7694)
---
 .../replicator/raft/ZonePartitionRaftListener.java | 14 +++++++
 .../internal/raft/service/RaftGroupListener.java   | 10 +++++
 .../internal/raft/server/impl/JraftServerImpl.java |  5 +++
 .../org/apache/ignite/raft/jraft/StateMachine.java | 11 +++++
 .../apache/ignite/raft/jraft/core/NodeImpl.java    | 20 +++++++++
 .../raft/jraft/storage/impl/LogManagerImpl.java    | 29 ++++++++++---
 .../raft/jraft/storage/impl/LogManagerTest.java    | 48 ++++++++++++++++++++++
 .../ItTruncateRaftLogAndRestartNodesTest.java      | 16 ++++++--
 8 files changed, 145 insertions(+), 8 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index 3011de42ad7..d2c480b357f 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -448,6 +448,20 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
         }
     }
 
+    @Override
+    public long getPersistedAppliedIndex() {
+        // Clamp to 0 because lastAppliedIndex() returns -1 during rebalance.
+        long result = max(0, txStateStorage.lastAppliedIndex());
+
+        synchronized (tableProcessorsStateLock) {
+            for (RaftTableProcessor processor : tableProcessors.values()) {
+                result = max(result, processor.lastAppliedIndex());
+            }
+        }
+
+        return result;
+    }
+
     /**
      * Adds a given Table Partition-level Raft processor to the set of managed 
processors.
      *
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
index bf2697e9b2d..076f753ccb2 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
@@ -96,4 +96,14 @@ public interface RaftGroupListener {
      * Invoked once after a raft node has been shut down.
      */
     void onShutdown();
+
+    /**
+     * Returns the last applied index persisted by the state machine.
+     * Called during Raft node initialization to prevent truncation of 
already-applied log entries.
+     *
+     * @return persisted applied index, or 0 if unknown.
+     */
+    default long getPersistedAppliedIndex() {
+        return 0;
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 8221a3e31a3..b20bf1508f5 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -873,6 +873,11 @@ public class JraftServerImpl implements RaftServer {
             return listener;
         }
 
+        @Override
+        public long getPersistedAppliedIndex() {
+            return listener.getPersistedAppliedIndex();
+        }
+
         @Override
         public void onApply(Iterator iter) {
             var iterWrapper = new WriteCommandIterator(iter, marshaller);
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/StateMachine.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/StateMachine.java
index 3519c99520d..d393b5242d9 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/StateMachine.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/StateMachine.java
@@ -18,6 +18,7 @@ package org.apache.ignite.raft.jraft;
 
 import org.apache.ignite.raft.jraft.conf.Configuration;
 import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
 import org.apache.ignite.raft.jraft.entity.LeaderChangeContext;
 import org.apache.ignite.raft.jraft.error.RaftException;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
@@ -133,4 +134,14 @@ public interface StateMachine {
      * @param ctx context of leader change
      */
     void onStartFollowing(final LeaderChangeContext ctx);
+
+    /**
+     * Returns the last applied index persisted by the state machine.
+     * Called during {@link NodeImpl#init()} to prevent truncation of 
already-applied log entries.
+     *
+     * @return persisted applied index, or 0 if unknown.
+     */
+    default long getPersistedAppliedIndex() {
+        return 0;
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 60aa117772c..ced1be2bfc3 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -1071,6 +1071,26 @@ public class NodeImpl implements Node, RaftServerService 
{
             return false;
         }
 
+        /*
+         * Restore appliedId so that unsafeTruncateSuffix() can reject 
truncation of applied entries.
+         *
+         * After a node restart, logManager.appliedId is transient and resets 
to 0.
+         * This block restores appliedId from the state machine's persisted 
applied index
+         * so the unsafeTruncateSuffix() guard is effective immediately after 
restart, before any entries are re-applied.
+         */
+        long persistedApplied = 
this.options.getFsm().getPersistedAppliedIndex();
+        if (persistedApplied > 0) {
+            long term = this.logManager.getTerm(persistedApplied);
+            if (term > 0) {
+                this.logManager.setAppliedId(new LogId(persistedApplied, 
term));
+            } else {
+                // Term is 0 when the index is outside the log (covered by a 
snapshot) — skip in that case.
+                LOG.warn("Persisted applied index is not in the raft log, 
expecting snapshot to cover it "
+                        + "[nodeId={}, persistedAppliedIndex={}]",
+                        getNodeId(), persistedApplied);
+            }
+        }
+
         final Status st = this.logManager.checkConsistency();
         if (!st.isOk()) {
             LOG.error("Node {} is initialized with inconsistent log, 
status={}.", getNodeId(), st);
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
index f486645ac23..83584f7fee2 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
@@ -1050,11 +1050,25 @@ public class LogManagerImpl implements LogManager {
         }
     }
 
-    private void unsafeTruncateSuffix(final long lastIndexKept, final Lock 
lock) {
+    /**
+     * Truncates log entries after {@code lastIndexKept}.
+     *
+     * @return {@code true} on success, {@code false} if truncation would 
discard applied entries (node moves to error state).
+     */
+    private boolean unsafeTruncateSuffix(final long lastIndexKept, final Lock 
lock) {
         if (lastIndexKept < this.appliedId.getIndex()) {
-            LOG.error("FATAL ERROR: Can't truncate logs before appliedId={}, 
lastIndexKept={}", this.appliedId,
-                lastIndexKept);
-            return;
+            LOG.error("Raft log suffix conflict: cannot truncate entries that 
have been applied to the state machine. "
+                    + "The partition will be moved to error state [nodeId={}, 
appliedId={}, lastIndexKept={}]",
+                    this.nodeId, this.appliedId, lastIndexKept);
+            lock.unlock();
+            try {
+                reportError(RaftError.EINVAL.getNumber(),
+                        "Raft log suffix conflict: attempted to truncate 
applied entries, appliedId=%s, lastIndexKept=%d",
+                        this.appliedId, lastIndexKept);
+            } finally {
+                lock.lock();
+            }
+            return false;
         }
 
         this.logsInMemory.removeFromLastWhen(entry -> entry.getId().getIndex() 
> lastIndexKept);
@@ -1069,6 +1083,7 @@ public class LogManagerImpl implements LogManager {
         final TruncateSuffixClosure c = new 
TruncateSuffixClosure(lastIndexKept, lastTermKept);
         offerEvent(c, EventType.TRUNCATE_SUFFIX);
         lock.lock();
+        return true;
     }
 
     @SuppressWarnings("NonAtomicOperationOnVolatileField")
@@ -1122,7 +1137,11 @@ public class LogManagerImpl implements LogManager {
                     if (entries.get(conflictingIndex).getId().getIndex() <= 
this.lastLogIndex) {
                         // Truncate all the conflicting entries to make local 
logs
                         // consensus with the leader.
-                        
unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1, 
lock);
+                        if 
(!unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1, 
lock)) {
+                            
Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done,
+                                new Status(RaftError.EINVAL, "Raft log suffix 
conflict with applied entries"));
+                            return false;
+                        }
                     }
                     this.lastLogIndex = lastLogEntry.getId().getIndex();
                 } // else this is a duplicated AppendEntriesRequest, we have
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
index 97d9aed4459..ee0001bd224 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
@@ -23,9 +23,14 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.verify;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import org.apache.ignite.internal.metrics.TestMetricManager;
@@ -40,6 +45,7 @@ import 
org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.EnumOutter;
 import org.apache.ignite.raft.jraft.entity.LogEntry;
 import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.error.RaftException;
 import org.apache.ignite.raft.jraft.entity.NodeId;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.entity.RaftOutter;
@@ -477,4 +483,46 @@ public class LogManagerTest extends BaseStorageTest {
         Exception e = assertThrows(IllegalStateException.class, () -> 
this.logManager.getLastLogIndex(true));
         assertEquals("Node is shutting down", e.getMessage());
     }
+
+    /** Suffix truncation below appliedId must report error and abort the 
append. */
+    @Test
+    public void testSuffixTruncationBelowAppliedIndexReportsError() {
+        List<LogEntry> entries = new ArrayList<>();
+        for (int i = 1; i <= 10; i++) {
+            entries.add(TestUtils.mockEntry(i, 1));
+        }
+        assertThat("Initial entries should be appended successfully", 
appendEntries(entries), willBe(true));
+        assertEquals(10, this.logManager.getLastLogIndex());
+
+        this.logManager.getLastLogId(true); // Flush disruptor so setDiskId() 
completes.
+        this.logManager.setAppliedId(new LogId(8, 1));
+
+        // Conflicting entries at index 6+ (term 2 vs existing term 1) trigger 
unsafeTruncateSuffix(5).
+        // Since 5 < appliedId.index (8), this must be rejected.
+        List<LogEntry> conflicting = new ArrayList<>();
+        for (int i = 6; i <= 12; i++) {
+            conflicting.add(TestUtils.mockEntry(i, 2));
+        }
+        assertThat("Append should fail due to suffix conflict with applied 
entries", appendEntries(conflicting), willBe(false));
+
+        verify(fsmCaller).onError(any(RaftException.class));
+        assertEquals(10, this.logManager.getLastLogIndex());
+
+        for (int i = 1; i <= 10; i++) {
+            LogEntry entry = this.logManager.getEntry(i);
+            assertEquals(i, entry.getId().getIndex());
+            assertEquals(1, entry.getId().getTerm());
+        }
+    }
+
+    private CompletableFuture<Boolean> appendEntries(List<LogEntry> entries) {
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+        this.logManager.appendEntries(new ArrayList<>(entries), new 
LogManager.StableClosure() {
+            @Override
+            public void run(Status status) {
+                future.complete(status.isOk());
+            }
+        });
+        return future;
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
index cfa64a01cf7..83e14a4db01 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
@@ -23,8 +23,10 @@ import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_
 import static 
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.arrayWithSize;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
@@ -72,13 +74,13 @@ import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
 import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.State;
 import org.apache.ignite.raft.jraft.option.LogStorageOptions;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.storage.LogStorage;
 import org.apache.ignite.tx.TransactionOptions;
 import org.hamcrest.Matchers;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -98,7 +100,6 @@ public class ItTruncateRaftLogAndRestartNodesTest extends 
ClusterPerTestIntegrat
         return 0;
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-25502";)
     @Test
     void enterNodeWithIndexGreaterThanCurrentMajority() throws Exception {
         cluster.startAndInit(3);
@@ -148,12 +149,21 @@ public class ItTruncateRaftLogAndRestartNodesTest extends 
ClusterPerTestIntegrat
 
             startNode(2);
 
+            // Node 2 has applied entries that the new majority (nodes 0, 1) 
doesn't have,
+            // so it must go to ERROR state when the leader tries to overwrite 
those entries.
+            await()
+                    .timeout(10, TimeUnit.SECONDS)
+                    .untilAsserted(() ->
+                            assertThat(raftNodeImpl(2, 
replicationGroup).getState(), equalTo(State.STATE_ERROR))
+                    );
+
+            // SQL should still work via the healthy majority.
             assertThat(
                     
toPeopleFromSqlRows(executeSql(selectPeopleDml(TABLE_NAME))),
                     arrayWithSize(Matchers.allOf(greaterThan(0), 
lessThan(people.length)))
             );
 
-            for (int nodeIndex = 0; nodeIndex < 3; nodeIndex++) {
+            for (int nodeIndex = 0; nodeIndex < 2; nodeIndex++) {
                 assertThat(
                         "nodeIndex=" + nodeIndex,
                         scanPeopleFromAllPartitions(nodeIndex, TABLE_NAME),

Reply via email to