This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a0804b7b456 IGNITE-27524 Logging and timeout management of 
waitForActualState (#7379)
a0804b7b456 is described below

commit a0804b7b456d624db7e4a5013706a62b767a6172
Author: Anton Laletin <[email protected]>
AuthorDate: Tue Jan 20 17:32:59 2026 +0400

    IGNITE-27524 Logging and timeout management of waitForActualState (#7379)
---
 .../apache/ignite/internal/util/IgniteUtils.java   |  37 ++---
 .../ignite/internal/util/IgniteUtilsTest.java      | 119 ++++++++++++++-
 .../partition/replicator/ReplicaPrimacyEngine.java |   2 +
 .../replicator/raft/ZonePartitionRaftListener.java |  10 +-
 .../PlacementDriverMessageProcessor.java           | 168 ++++++++++++++++++---
 .../exception/PrimaryReplicaMissException.java     |  20 ++-
 .../sql/engine/exec/QueryRecoveryTest.java         |   2 +-
 .../distributed/raft/TablePartitionProcessor.java  |  18 ++-
 .../replicator/PartitionReplicaListener.java       |  12 +-
 .../distributed/storage/InternalTableImpl.java     |   2 +-
 .../TxStateMetaRocksDbPartitionStorage.java        |   5 +
 .../apache/ignite/internal/tx/TxManagerTest.java   |   1 +
 12 files changed, 340 insertions(+), 56 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index db898d50d11..7652f8200a6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -1184,50 +1184,45 @@ public class IgniteUtils {
     }
 
     /**
-     * Retries operation until it succeeds or fails with exception that is 
different than the given.
+     * Retries operation until it succeeds or timeout occurs.
      *
      * @param operation Operation.
-     * @param stopRetryCondition Condition that accepts the exception if one 
has been thrown, and defines whether retries should be
-     *         stopped.
+     * @param timeout Timeout in milliseconds.
      * @param executor Executor to make retry in.
      * @return Future that is completed when operation is successful or failed 
with other exception than the given.
      */
-    public static <T> CompletableFuture<T> retryOperationUntilSuccess(
+    public static <T> CompletableFuture<T> retryOperationUntilSuccessOrTimeout(
             Supplier<CompletableFuture<T>> operation,
-            Function<Throwable, Boolean> stopRetryCondition,
+            long timeout,
             Executor executor
     ) {
-        CompletableFuture<T> fut = new CompletableFuture<>();
+        CompletableFuture<T> futureWithTimeout = new 
CompletableFuture<T>().orTimeout(timeout, TimeUnit.MILLISECONDS);
 
-        retryOperationUntilSuccess(operation, stopRetryCondition, fut, 
executor);
+        retryOperationUntilSuccessOrFutureDone(operation, futureWithTimeout, 
executor);
 
-        return fut;
+        return futureWithTimeout;
     }
 
     /**
-     * Retries operation until it succeeds or fails with exception that is 
different than the given.
+     * Retries operation until it succeeds or provided future is done.
      *
      * @param operation Operation.
-     * @param stopRetryCondition Condition that accepts the exception if one 
has been thrown, and defines whether retries should be
-     *         stopped.
+     * @param future Future to track.
      * @param executor Executor to make retry in.
-     * @param fut Future that is completed when operation is successful or 
failed with other exception than the given.
      */
-    public static <T> void retryOperationUntilSuccess(
+    private static <T> void retryOperationUntilSuccessOrFutureDone(
             Supplier<CompletableFuture<T>> operation,
-            Function<Throwable, Boolean> stopRetryCondition,
-            CompletableFuture<T> fut,
+            CompletableFuture<T> future,
             Executor executor
     ) {
+
         operation.get()
                 .whenComplete((res, e) -> {
-                    if (e == null) {
-                        fut.complete(res);
-                    } else {
-                        if (stopRetryCondition.apply(e)) {
-                            fut.completeExceptionally(e);
+                    if (!future.isDone()) {
+                        if (e == null) {
+                            future.complete(res);
                         } else {
-                            executor.execute(() -> 
retryOperationUntilSuccess(operation, stopRetryCondition, fut, executor));
+                            executor.execute(() -> 
retryOperationUntilSuccessOrFutureDone(operation, future, executor));
                         }
                     }
                 });
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
index 0aa80378713..aefb1f55de9 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -29,9 +30,11 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
 import static 
org.apache.ignite.internal.util.IgniteUtils.byteBufferToByteArray;
 import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
 import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
+import static 
org.apache.ignite.internal.util.IgniteUtils.retryOperationUntilSuccessOrTimeout;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.arrayWithSize;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -49,10 +52,15 @@ import java.util.List;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.util.worker.IgniteWorker;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 /**
  * Test suite for {@link IgniteUtils}.
@@ -161,7 +169,7 @@ class IgniteUtilsTest extends BaseIgniteAbstractTest {
             } catch (ExecutionException e) {
                 throw new RuntimeException(e);
             }
-        }).get(1, TimeUnit.SECONDS);
+        }).get(1, SECONDS);
     }
 
     @Test
@@ -217,4 +225,111 @@ class IgniteUtilsTest extends BaseIgniteAbstractTest {
         ByteBuffer smallDirectBuffer = 
bigDirectBuffer.position(1).limit(4).slice();
         assertArrayEquals(new byte[] {1, 2, 3}, 
byteBufferToByteArray(smallDirectBuffer));
     }
+
+    @Test
+    @Timeout(value = 10, unit = SECONDS)
+    void testRetryOperationUntilSuccessOrTimeout_SuccessOnFirstAttempt() {
+        Executor executor = Executors.newSingleThreadExecutor();
+        AtomicInteger callCount = new AtomicInteger(0);
+
+        CompletableFuture<String> result = retryOperationUntilSuccessOrTimeout(
+                () -> {
+                    callCount.incrementAndGet();
+                    return completedFuture("success");
+                },
+                1000,
+                executor
+        );
+
+        assertThat(result, willBe(equalTo("success")));
+        assertThat(callCount.get(), equalTo(1));
+    }
+
+    @Test
+    @Timeout(value = 10, unit = SECONDS)
+    void testRetryOperationUntilSuccessOrTimeout_SuccessAfterRetries() {
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        try {
+            AtomicInteger callCount = new AtomicInteger(0);
+
+            CompletableFuture<String> result = 
retryOperationUntilSuccessOrTimeout(
+                    () -> {
+                        int count = callCount.incrementAndGet();
+                        if (count < 3) {
+                            return failedFuture(new IOException("Temporary 
failure"));
+                        }
+                        return completedFuture("success");
+                    },
+                    1000,
+                    executor
+            );
+
+            assertThat(result, willBe(equalTo("success")));
+            assertThat(callCount.get(), equalTo(3));
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
+    @Test
+    @Timeout(value = 10, unit = SECONDS)
+    void testRetryOperationUntilSuccessOrTimeout_Timeout() {
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        try {
+            AtomicInteger callCount = new AtomicInteger(0);
+
+            CompletableFuture<String> result = 
retryOperationUntilSuccessOrTimeout(
+                    () -> {
+                        callCount.incrementAndGet();
+                        return failedFuture(new IOException("Persistent 
failure"));
+                    },
+                    100,
+                    executor
+            );
+
+            assertThat(result, willThrow(TimeoutException.class));
+            // Should have made multiple attempts before timing out
+            assertThat(callCount.get(), greaterThan(1));
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
+    @Test
+    @Timeout(value = 10, unit = SECONDS)
+    void 
testRetryOperationUntilSuccessOrTimeout_StopsWhenFutureCompletedExternally() 
throws Exception {
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        try {
+            AtomicInteger callCount = new AtomicInteger(0);
+
+            CompletableFuture<Void> firstAttemptStarted = new 
CompletableFuture<>();
+
+            CompletableFuture<String> result = 
retryOperationUntilSuccessOrTimeout(
+                    () -> {
+                        int attempt = callCount.incrementAndGet();
+
+                        if (attempt == 1) {
+                            firstAttemptStarted.complete(null);
+
+                            return new CompletableFuture<>();
+                        }
+
+                        return failedFuture(new IOException("Should not be 
retried after external completion"));
+                    },
+                    10_000,
+                    executor
+            );
+
+            firstAttemptStarted.get(1, SECONDS);
+
+            assertTrue(result.complete("external"));
+
+            assertThat(result, willBe(equalTo("external")));
+
+            assertThat(callCount.get(), equalTo(1));
+        } finally {
+            executor.shutdownNow();
+        }
+    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java
index 8c6b847ef4b..b7cdf818aa6 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java
@@ -119,6 +119,7 @@ public class ReplicaPrimacyEngine {
                     null,
                     enlistmentConsistencyToken,
                     null,
+                    replicationGroupId,
                     null
             );
         }
@@ -136,6 +137,7 @@ public class ReplicaPrimacyEngine {
                     primaryReplicaMeta.getLeaseholderId(),
                     enlistmentConsistencyToken,
                     currentEnlistmentConsistencyToken,
+                    replicationGroupId,
                     null);
         }
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index 3abd9fb8662..c5dd6d3aa74 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -210,9 +210,17 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
                 } else if (command instanceof SafeTimeSyncCommand) {
                     result = handleSafeTimeSyncCommand((SafeTimeSyncCommand) 
command, commandIndex, commandTerm);
                 } else if (command instanceof PrimaryReplicaChangeCommand) {
+                    PrimaryReplicaChangeCommand cmd = 
(PrimaryReplicaChangeCommand) command;
+                    LOG.debug("Processing PrimaryReplicaChangeCommand 
[groupId={}, commandIndex={}, commandTerm={}, "
+                                    + "leaseStartTime={}, primaryNodeId={}, 
primaryNodeName={}]",
+                            partitionKey.toReplicationGroupId(), commandIndex, 
commandTerm,
+                            cmd.leaseStartTime(), cmd.primaryReplicaNodeId(), 
cmd.primaryReplicaNodeName());
+
                     result = processCrossTableProcessorsCommand(command, 
commandIndex, commandTerm, safeTimestamp);
 
-                    if 
(updateLeaseInfoInTxStorage((PrimaryReplicaChangeCommand) command, 
commandIndex, commandTerm)) {
+                    if (updateLeaseInfoInTxStorage(cmd, commandIndex, 
commandTerm)) {
+                        LOG.debug("Updated lease info in tx storage 
[groupId={}, commandIndex={}, leaseStartTime={}]",
+                                partitionKey.toReplicationGroupId(), 
commandIndex, cmd.leaseStartTime());
                         result = EMPTY_APPLIED_RESULT;
                     }
                 } else {
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java
index 13685a2319e..033c9ae2f54 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java
@@ -20,14 +20,15 @@ package org.apache.ignite.internal.replicator;
 import static java.lang.System.currentTimeMillis;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions.isRecoverable;
 import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
-import static 
org.apache.ignite.internal.util.IgniteUtils.retryOperationUntilSuccess;
+import static 
org.apache.ignite.internal.util.IgniteUtils.retryOperationUntilSuccessOrTimeout;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import org.apache.ignite.internal.hlc.ClockService;
@@ -49,6 +50,7 @@ import 
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.util.TrackerClosedException;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -175,12 +177,7 @@ public class PlacementDriverMessageProcessor {
                     if (msg.force()) {
                         // Replica must wait till storage index reaches the 
current leader's index to make sure that all updates made on the
                         // group leader are received.
-                        return waitForActualState(msg.leaseStartTime(), 
msg.leaseExpirationTime().getPhysical())
-                                .thenCompose(v -> 
sendPrimaryReplicaChangeToReplicationGroup(
-                                        msg.leaseStartTime().longValue(),
-                                        localNode.id(),
-                                        localNode.name()
-                                ))
+                        return waitForStateAndSendPrimaryReplicaChanged(msg)
                                 .thenCompose(v -> {
                                     
CompletableFuture<LeaseGrantedMessageResponse> respFut =
                                             acceptLease(msg.leaseStartTime(), 
msg.leaseExpirationTime());
@@ -194,12 +191,7 @@ public class PlacementDriverMessageProcessor {
                                 });
                     } else {
                         if (leader.equals(localNode)) {
-                            return waitForActualState(msg.leaseStartTime(), 
msg.leaseExpirationTime().getPhysical())
-                                    .thenCompose(v -> 
sendPrimaryReplicaChangeToReplicationGroup(
-                                            msg.leaseStartTime().longValue(),
-                                            localNode.id(),
-                                            localNode.name()
-                                    ))
+                            return 
waitForStateAndSendPrimaryReplicaChanged(msg)
                                     .thenCompose(v -> 
acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime()));
                         } else {
                             return proposeLeaseRedirect(leader);
@@ -209,6 +201,24 @@ public class PlacementDriverMessageProcessor {
         );
     }
 
+    private @NotNull CompletableFuture<Void> 
waitForStateAndSendPrimaryReplicaChanged(LeaseGrantedMessage msg) {
+        return waitForActualState(msg.leaseStartTime(), 
msg.leaseExpirationTime().getPhysical())
+                .thenCompose(
+                        v -> 
sendPrimaryReplicaChangeToReplicationGroup(msg.leaseStartTime().longValue(),
+                                localNode.id(), localNode.name()))
+                .whenComplete((v, e) -> {
+                    if (e != null) {
+                        LOG.warn("Could not save lease information to the 
replication group "
+                                        + "[leaseStartTime={}, 
candidateNodeId={}, candidateNodeName ={}].", e,
+                                msg.leaseStartTime().longValue(), 
localNode.id(), localNode.name());
+                    } else {
+                        LOG.info("The lease information was saved to the 
replication group "
+                                        + "[leaseStartTime={}, 
candidateNodeId={}, candidateNodeName ={}].",
+                                msg.leaseStartTime().longValue(), 
localNode.id(), localNode.name());
+                    }
+                });
+    }
+
     private CompletableFuture<Void> sendPrimaryReplicaChangeToReplicationGroup(
             long leaseStartTime,
             UUID primaryReplicaNodeId,
@@ -259,18 +269,52 @@ public class PlacementDriverMessageProcessor {
      * @return Future that is completed when local storage catches up the 
index that is actual for leader on the moment of request.
      */
     private CompletableFuture<Void> waitForActualState(HybridTimestamp 
startTime, long expirationTime) {
-        LOG.info("Waiting for actual storage state, group=" + groupId);
+        TimeTracker readIndexTimeTracker = new TimeTracker(
+                startTime,
+                expirationTime,
+                groupId,
+                "Timeout is expired before raft index reading started");
+
+        LOG.info("Waiting for actual storage state {}", 
readIndexTimeTracker.timeMessageDetails());
 
         replicaReservationClosure.accept(groupId, startTime);
 
-        long timeout = expirationTime - currentTimeMillis();
-        if (timeout <= 0) {
-            return failedFuture(new TimeoutException());
+        if (readIndexTimeTracker.isExpired()) {
+            return readIndexTimeTracker.timeoutFailedFuture();
         }
 
-        return retryOperationUntilSuccess(raftClient::readIndex, e -> 
currentTimeMillis() > expirationTime, executor)
-                .orTimeout(timeout, TimeUnit.MILLISECONDS)
-                .thenCompose(storageIndexTracker::waitFor);
+        return retryOperationUntilSuccessOrTimeout(raftClient::readIndex, 
readIndexTimeTracker.remainingTimeoutMs(), executor)
+                .whenComplete((raftIndex, readIndexError) -> {
+                    if (readIndexError != null) {
+                        LOG.warn("Failed to read index from raft leader {}.",
+                                readIndexError, 
readIndexTimeTracker.timeMessageDetails());
+                    } else {
+                        LOG.debug("Successfully read index from raft leader 
{}.", readIndexTimeTracker.timeMessageDetails());
+                    }
+                })
+                .thenCompose(raftIndex -> {
+                    // Recalculate remaining time after readIndex completes.
+                    TimeTracker storageIndexUpdateTimeTracker = new 
TimeTracker(
+                            startTime,
+                            expirationTime,
+                            groupId,
+                            "Timeout is expired before storage index tracking 
started");
+                    if (storageIndexUpdateTimeTracker.isExpired()) {
+                        return 
storageIndexUpdateTimeTracker.timeoutFailedFuture();
+                    }
+
+                    return storageIndexTracker.waitFor(raftIndex)
+                            
.orTimeout(storageIndexUpdateTimeTracker.remainingTimeoutMs(), MILLISECONDS)
+                            .whenComplete((v, storageIndexTrackerError) -> {
+                                if (storageIndexTrackerError != null) {
+                                    LOG.warn("Failed to wait for storage index 
to reach raft leader {}.",
+                                            storageIndexTrackerError, 
storageIndexUpdateTimeTracker.timeMessageDetails());
+                                } else {
+                                    LOG.debug("Successfully waited for storage 
index to reach raft leader {}.",
+                                            
storageIndexUpdateTimeTracker.timeMessageDetails());
+                                }
+                            });
+                });
     }
 
     private void onLeaderElected(InternalClusterNode clusterNode, long term) {
@@ -281,4 +325,86 @@ public class PlacementDriverMessageProcessor {
     private CompletableFuture<InternalClusterNode> leaderFuture() {
         return leaderReadyFuture.thenApply(ignored -> leaderRef);
     }
+
+    /**
+     * Tracks time for timeout operations. Calculates remaining time based on 
expiration time and provides utilities
+     * for checking expiration and creating timeout exceptions with detailed 
messages.
+     */
+    private static class TimeTracker {
+        /** Lease start time. */
+        private final HybridTimestamp leaseStartTime;
+        /** Expiration time in milliseconds. */
+        private final long expirationTime;
+
+        /** Replication group ID for logging purposes. */
+        private final ReplicationGroupId groupId;
+
+        /** Start time when this tracker was created. */
+        private final long startTime;
+
+        /** Base message for timeout exception. */
+        private final String message;
+
+        /**
+         * Creates a time tracker.
+         *
+         * @param expirationTime Expiration time in milliseconds.
+         * @param groupId Replication group ID for logging purposes.
+         * @param message Base message for timeout exception.
+         */
+        private TimeTracker(HybridTimestamp leaseStartTime, long 
expirationTime, ReplicationGroupId groupId, String message) {
+            this.leaseStartTime = leaseStartTime;
+            this.expirationTime = expirationTime;
+            this.groupId = groupId;
+            this.startTime = currentTimeMillis();
+            this.message = message;
+        }
+
+        /**
+         * Checks if the timeout has expired.
+         *
+         * @return {@code true} if the timeout has expired (timeLeft < 0), 
{@code false} otherwise.
+         */
+        public boolean isExpired() {
+            return remainingTimeoutMs() < 0;
+        }
+
+        /**
+         * Creates a failed future with a timeout exception that includes 
detailed information.
+         *
+         * @return A failed future with {@link TimeoutException} containing 
the base message and details.
+         */
+        public CompletableFuture<Void> timeoutFailedFuture() {
+            return failedFuture(new TimeoutException(format("{} {}.", message, 
timeMessageDetails())));
+        }
+
+        /**
+         * Formats message details including group ID, expiration time, and 
current time.
+         *
+         * @return Formatted message details.
+         */
+        public String timeMessageDetails() {
+            return format("[groupId={}, leaseStartTime={}, 
expirationTimeMs={}, remainingTimeoutMs={}, durationMs={}]",
+                    groupId, leaseStartTime, expirationTime, 
remainingTimeoutMs(), durationMs());
+        }
+
+        /**
+         * Returns the duration since this tracker was created.
+         *
+         * @return Duration in milliseconds since the tracker was created.
+         */
+        public long durationMs() {
+            return currentTimeMillis() - startTime;
+        }
+
+        /**
+         * Returns the remaining time until expiration in milliseconds.
+         *
+         * @return Remaining time in milliseconds. Can be negative if already 
expired.
+         */
+        public long remainingTimeoutMs() {
+            return expirationTime - currentTimeMillis();
+        }
+    }
+
 }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
index 39dfff38cb9..429faadd92a 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR;
 
 import java.util.UUID;
 import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.tx.RetriableTransactionException;
 import org.jetbrains.annotations.Nullable;
 
@@ -38,11 +39,17 @@ public class PrimaryReplicaMissException extends 
IgniteInternalException impleme
      * @param txId Transaction id.
      * @param expectedEnlistmentConsistencyToken Expected enlistment 
consistency token, {@code null} if absent.
      * @param currentEnlistmentConsistencyToken Current enlistment consistency 
token, {@code null} if absent.
+     * @param replicationGroupId Replication group id, {@code null} if absent.
      */
-    public PrimaryReplicaMissException(UUID txId, Long 
expectedEnlistmentConsistencyToken, Long currentEnlistmentConsistencyToken) {
+    public PrimaryReplicaMissException(
+            UUID txId, 
+            Long expectedEnlistmentConsistencyToken, 
+            Long currentEnlistmentConsistencyToken,
+            @Nullable ReplicationGroupId replicationGroupId
+    ) {
         super(REPLICA_MISS_ERR, format("The primary replica has changed 
[txId={}, expectedEnlistmentConsistencyToken={}, "
-                + "currentEnlistmentConsistencyToken={}].", txId, 
expectedEnlistmentConsistencyToken,
-                currentEnlistmentConsistencyToken));
+                + "currentEnlistmentConsistencyToken={}, 
replicationGroupId={}].", 
+                txId, expectedEnlistmentConsistencyToken, 
currentEnlistmentConsistencyToken, replicationGroupId));
     }
 
     /**
@@ -54,6 +61,7 @@ public class PrimaryReplicaMissException extends 
IgniteInternalException impleme
      * @param currentLeaseholderId Current leaseholder id, {@code null} if 
absent.
      * @param expectedEnlistmentConsistencyToken Expected enlistment 
consistency token, {@code null} if absent.
      * @param currentEnlistmentConsistencyToken Current enlistment consistency 
token, {@code null} if absent.
+     * @param replicationGroupId Replication group id, {@code null} if absent.
      * @param cause Cause exception, {@code null} if absent.
      */
     public PrimaryReplicaMissException(
@@ -63,20 +71,22 @@ public class PrimaryReplicaMissException extends 
IgniteInternalException impleme
             @Nullable UUID currentLeaseholderId,
             @Nullable Long expectedEnlistmentConsistencyToken,
             @Nullable Long currentEnlistmentConsistencyToken,
+            @Nullable ReplicationGroupId replicationGroupId,
             @Nullable Throwable cause
     ) {
         super(
                 REPLICA_MISS_ERR,
                 "The primary replica has changed "
                         + "[expectedLeaseholderName={}, 
currentLeaseholderName={}, expectedLeaseholderId={}, currentLeaseholderId={},"
-                        + " expectedEnlistmentConsistencyToken={}, 
currentEnlistmentConsistencyToken={}]",
+                        + " expectedEnlistmentConsistencyToken={}, 
currentEnlistmentConsistencyToken={}, replicationGroupId={}]",
                 cause,
                 expectedLeaseholderName,
                 currentLeaseholderName,
                 expectedLeaseholderId,
                 currentLeaseholderId,
                 expectedEnlistmentConsistencyToken,
-                currentEnlistmentConsistencyToken
+                currentEnlistmentConsistencyToken,
+                replicationGroupId
         );
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
index c9aa1ba3c82..f2d7471687c 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
@@ -228,7 +228,7 @@ public class QueryRecoveryTest extends 
BaseIgniteAbstractTest {
         cluster.setDataProvider("T1", TestBuilders.tableScan((nodeName, 
partId) -> {
             if (firstTimeThrow.compareAndSet(true, false)) {
                 reassignmentHappened.set(true);
-                return () -> new FailingIterator<>(new 
PrimaryReplicaMissException(UUID.randomUUID(), 0L, 0L));
+                return () -> new FailingIterator<>(new 
PrimaryReplicaMissException(UUID.randomUUID(), 0L, 0L, null));
             }
 
             return Collections.singleton(new Object[]{partId, partId, 
nodeName});
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
index a8e520e5a8e..8c5d2b091d0 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
@@ -37,6 +37,8 @@ import java.util.concurrent.Executor;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommandV2;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
@@ -73,6 +75,8 @@ import org.jetbrains.annotations.TestOnly;
  * Partition command handler.
  */
 public class TablePartitionProcessor implements RaftTableProcessor {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TablePartitionProcessor.class);
+
     /** Transaction manager. */
     private final TxManager txManager;
 
@@ -500,8 +504,17 @@ public class TablePartitionProcessor implements 
RaftTableProcessor {
             long commandIndex,
             long commandTerm
     ) {
+        long storageLastAppliedIndex = storage.lastAppliedIndex();
+        LOG.debug("Handling PrimaryReplicaChangeCommand [tableId={}, 
partId={}, commandIndex={}, storageLastAppliedIndex={}, "
+                        + "leaseStartTime={}, primaryNodeId={}, 
primaryNodeName={}]",
+                storage.tableId(), storage.partitionId(), commandIndex, 
storageLastAppliedIndex,
+                cmd.leaseStartTime(), cmd.primaryReplicaNodeId(), 
cmd.primaryReplicaNodeName());
+
         // Skips the write command because the storage has already executed it.
-        if (commandIndex <= storage.lastAppliedIndex()) {
+        if (commandIndex <= storageLastAppliedIndex) {
+            LOG.debug("Skipping PrimaryReplicaChangeCommand - already applied 
[tableId={}, partId={}, commandIndex={}, "
+                            + "storageLastAppliedIndex={}]",
+                    storage.tableId(), storage.partitionId(), commandIndex, 
storageLastAppliedIndex);
             return EMPTY_NOT_APPLIED_RESULT;
         }
 
@@ -515,6 +528,9 @@ public class TablePartitionProcessor implements 
RaftTableProcessor {
             return null;
         });
 
+        LOG.debug("Successfully applied PrimaryReplicaChangeCommand 
[tableId={}, partId={}, commandIndex={}, leaseStartTime={}]",
+                storage.tableId(), storage.partitionId(), commandIndex, 
cmd.leaseStartTime());
+
         return EMPTY_APPLIED_RESULT;
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 9419bbc2434..0129f60fafc 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -572,7 +572,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             // We treat SCAN as 2pc and only switch to a 1pc mode if all table 
rows fit in the bucket and the transaction is implicit.
             // See `req.full() && (err != null || rows.size() < 
req.batchSize())` condition.
             // If they don't fit the bucket, the transaction is treated as 2pc.
-            txManager.updateTxMeta(req.transactionId(),  old -> builder(old, 
PENDING)
+            txManager.updateTxMeta(req.transactionId(), old -> builder(old, 
PENDING)
                     .txCoordinatorId(req.coordinatorId())
                     
.commitPartitionId(req.commitPartitionId().asZonePartitionId())
                     .txLabel(req.txLabel())
@@ -2513,7 +2513,12 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                 UpdateCommandResult updateCommandResult = 
(UpdateCommandResult) res;
 
                 if (updateCommandResult != null && 
!updateCommandResult.isPrimaryReplicaMatch()) {
-                    throw new PrimaryReplicaMissException(txId, 
cmd.leaseStartTime(), updateCommandResult.currentLeaseStartTime());
+                    throw new PrimaryReplicaMissException(
+                            txId, 
+                            cmd.leaseStartTime(), 
+                            updateCommandResult.currentLeaseStartTime(),
+                            replicationGroupId
+                    );
                 }
 
                 if (updateCommandResult != null && 
updateCommandResult.isPrimaryInPeersAndLearners()) {
@@ -2654,7 +2659,8 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                     throw new PrimaryReplicaMissException(
                             cmd.txId(),
                             cmd.leaseStartTime(),
-                            updateCommandResult.currentLeaseStartTime()
+                            updateCommandResult.currentLeaseStartTime(),
+                            replicationGroupId
                     );
                 }
                 if (updateCommandResult.isPrimaryInPeersAndLearners()) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 520304f5be3..320c2193d73 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -413,7 +413,7 @@ public class InternalTableImpl implements InternalTable {
     }
 
     /**
-     * Enlists a single row into a transaction.
+     * Enlists collection of rows into a transaction.
      *
      * @param keyRows Rows.
      * @param tx The transaction.
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateMetaRocksDbPartitionStorage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateMetaRocksDbPartitionStorage.java
index d94c4e7403c..b8d3bdd830e 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateMetaRocksDbPartitionStorage.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateMetaRocksDbPartitionStorage.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.tx.storage.state.rocksdb;
 
+import static java.util.Optional.ofNullable;
 import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
 import static 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage.BYTE_ORDER;
 import static 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbStorage.ZONE_ID_SIZE_BYTES;
@@ -166,6 +167,10 @@ class TxStateMetaRocksDbPartitionStorage {
     }
 
     void updateLease(WriteBatch writeBatch, LeaseInfo leaseInfo) throws 
RocksDBException {
+        long currentLeaseStartTime = 
ofNullable(this.leaseInfo).map(LeaseInfo::leaseStartTime).orElse(Long.MIN_VALUE);
+        if (leaseInfo.leaseStartTime() <= currentLeaseStartTime) {
+            return;
+        }
         columnFamily.put(writeBatch, leaseInfoKey, 
VersionedSerialization.toBytes(leaseInfo, LeaseInfoSerializer.INSTANCE));
 
         this.leaseInfo = leaseInfo;
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 82808c30793..12d2bb28048 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -599,6 +599,7 @@ public class TxManagerTest extends IgniteAbstractTest {
                                 null,
                                 10L,
                                 null,
+                                null,
                                 null
                         )),
                         completedFuture(new 
TransactionResult(TxState.COMMITTED, commitTimestamp))


Reply via email to