This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a0804b7b456 IGNITE-27524 Logging and timeout management of
waitForActualState (#7379)
a0804b7b456 is described below
commit a0804b7b456d624db7e4a5013706a62b767a6172
Author: Anton Laletin <[email protected]>
AuthorDate: Tue Jan 20 17:32:59 2026 +0400
IGNITE-27524 Logging and timeout management of waitForActualState (#7379)
---
.../apache/ignite/internal/util/IgniteUtils.java | 37 ++---
.../ignite/internal/util/IgniteUtilsTest.java | 119 ++++++++++++++-
.../partition/replicator/ReplicaPrimacyEngine.java | 2 +
.../replicator/raft/ZonePartitionRaftListener.java | 10 +-
.../PlacementDriverMessageProcessor.java | 168 ++++++++++++++++++---
.../exception/PrimaryReplicaMissException.java | 20 ++-
.../sql/engine/exec/QueryRecoveryTest.java | 2 +-
.../distributed/raft/TablePartitionProcessor.java | 18 ++-
.../replicator/PartitionReplicaListener.java | 12 +-
.../distributed/storage/InternalTableImpl.java | 2 +-
.../TxStateMetaRocksDbPartitionStorage.java | 5 +
.../apache/ignite/internal/tx/TxManagerTest.java | 1 +
12 files changed, 340 insertions(+), 56 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index db898d50d11..7652f8200a6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -1184,50 +1184,45 @@ public class IgniteUtils {
}
/**
- * Retries operation until it succeeds or fails with exception that is
different than the given.
+ * Retries operation until it succeeds or timeout occurs.
*
* @param operation Operation.
- * @param stopRetryCondition Condition that accepts the exception if one
has been thrown, and defines whether retries should be
- * stopped.
+ * @param timeout Timeout in milliseconds.
* @param executor Executor to make retry in.
* @return Future that is completed when operation is successful or failed
with other exception than the given.
*/
- public static <T> CompletableFuture<T> retryOperationUntilSuccess(
+ public static <T> CompletableFuture<T> retryOperationUntilSuccessOrTimeout(
Supplier<CompletableFuture<T>> operation,
- Function<Throwable, Boolean> stopRetryCondition,
+ long timeout,
Executor executor
) {
- CompletableFuture<T> fut = new CompletableFuture<>();
+ CompletableFuture<T> futureWithTimeout = new
CompletableFuture<T>().orTimeout(timeout, TimeUnit.MILLISECONDS);
- retryOperationUntilSuccess(operation, stopRetryCondition, fut,
executor);
+ retryOperationUntilSuccessOrFutureDone(operation, futureWithTimeout,
executor);
- return fut;
+ return futureWithTimeout;
}
/**
- * Retries operation until it succeeds or fails with exception that is
different than the given.
+ * Retries operation until it succeeds or provided future is done.
*
* @param operation Operation.
- * @param stopRetryCondition Condition that accepts the exception if one
has been thrown, and defines whether retries should be
- * stopped.
+ * @param future Future to track.
* @param executor Executor to make retry in.
- * @param fut Future that is completed when operation is successful or
failed with other exception than the given.
*/
- public static <T> void retryOperationUntilSuccess(
+ private static <T> void retryOperationUntilSuccessOrFutureDone(
Supplier<CompletableFuture<T>> operation,
- Function<Throwable, Boolean> stopRetryCondition,
- CompletableFuture<T> fut,
+ CompletableFuture<T> future,
Executor executor
) {
+
operation.get()
.whenComplete((res, e) -> {
- if (e == null) {
- fut.complete(res);
- } else {
- if (stopRetryCondition.apply(e)) {
- fut.completeExceptionally(e);
+ if (!future.isDone()) {
+ if (e == null) {
+ future.complete(res);
} else {
- executor.execute(() ->
retryOperationUntilSuccess(operation, stopRetryCondition, fut, executor));
+ executor.execute(() ->
retryOperationUntilSuccessOrFutureDone(operation, future, executor));
}
}
});
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
index 0aa80378713..aefb1f55de9 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -29,9 +30,11 @@ import static
org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
import static
org.apache.ignite.internal.util.IgniteUtils.byteBufferToByteArray;
import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
+import static
org.apache.ignite.internal.util.IgniteUtils.retryOperationUntilSuccessOrTimeout;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -49,10 +52,15 @@ import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.util.worker.IgniteWorker;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
/**
* Test suite for {@link IgniteUtils}.
@@ -161,7 +169,7 @@ class IgniteUtilsTest extends BaseIgniteAbstractTest {
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
- }).get(1, TimeUnit.SECONDS);
+ }).get(1, SECONDS);
}
@Test
@@ -217,4 +225,111 @@ class IgniteUtilsTest extends BaseIgniteAbstractTest {
ByteBuffer smallDirectBuffer =
bigDirectBuffer.position(1).limit(4).slice();
assertArrayEquals(new byte[] {1, 2, 3},
byteBufferToByteArray(smallDirectBuffer));
}
+
+ @Test
+ @Timeout(value = 10, unit = SECONDS)
+ void testRetryOperationUntilSuccessOrTimeout_SuccessOnFirstAttempt() {
+ Executor executor = Executors.newSingleThreadExecutor();
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ CompletableFuture<String> result = retryOperationUntilSuccessOrTimeout(
+ () -> {
+ callCount.incrementAndGet();
+ return completedFuture("success");
+ },
+ 1000,
+ executor
+ );
+
+ assertThat(result, willBe(equalTo("success")));
+ assertThat(callCount.get(), equalTo(1));
+ }
+
+ @Test
+ @Timeout(value = 10, unit = SECONDS)
+ void testRetryOperationUntilSuccessOrTimeout_SuccessAfterRetries() {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ CompletableFuture<String> result =
retryOperationUntilSuccessOrTimeout(
+ () -> {
+ int count = callCount.incrementAndGet();
+ if (count < 3) {
+ return failedFuture(new IOException("Temporary
failure"));
+ }
+ return completedFuture("success");
+ },
+ 1000,
+ executor
+ );
+
+ assertThat(result, willBe(equalTo("success")));
+ assertThat(callCount.get(), equalTo(3));
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ @Timeout(value = 10, unit = SECONDS)
+ void testRetryOperationUntilSuccessOrTimeout_Timeout() {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ CompletableFuture<String> result =
retryOperationUntilSuccessOrTimeout(
+ () -> {
+ callCount.incrementAndGet();
+ return failedFuture(new IOException("Persistent
failure"));
+ },
+ 100,
+ executor
+ );
+
+ assertThat(result, willThrow(TimeoutException.class));
+ // Should have made multiple attempts before timing out
+ assertThat(callCount.get(), greaterThan(1));
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ @Timeout(value = 10, unit = SECONDS)
+ void
testRetryOperationUntilSuccessOrTimeout_StopsWhenFutureCompletedExternally()
throws Exception {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ try {
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ CompletableFuture<Void> firstAttemptStarted = new
CompletableFuture<>();
+
+ CompletableFuture<String> result =
retryOperationUntilSuccessOrTimeout(
+ () -> {
+ int attempt = callCount.incrementAndGet();
+
+ if (attempt == 1) {
+ firstAttemptStarted.complete(null);
+
+ return new CompletableFuture<>();
+ }
+
+ return failedFuture(new IOException("Should not be
retried after external completion"));
+ },
+ 10_000,
+ executor
+ );
+
+ firstAttemptStarted.get(1, SECONDS);
+
+ assertTrue(result.complete("external"));
+
+ assertThat(result, willBe(equalTo("external")));
+
+ assertThat(callCount.get(), equalTo(1));
+ } finally {
+ executor.shutdownNow();
+ }
+ }
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java
index 8c6b847ef4b..b7cdf818aa6 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java
@@ -119,6 +119,7 @@ public class ReplicaPrimacyEngine {
null,
enlistmentConsistencyToken,
null,
+ replicationGroupId,
null
);
}
@@ -136,6 +137,7 @@ public class ReplicaPrimacyEngine {
primaryReplicaMeta.getLeaseholderId(),
enlistmentConsistencyToken,
currentEnlistmentConsistencyToken,
+ replicationGroupId,
null);
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index 3abd9fb8662..c5dd6d3aa74 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -210,9 +210,17 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
} else if (command instanceof SafeTimeSyncCommand) {
result = handleSafeTimeSyncCommand((SafeTimeSyncCommand)
command, commandIndex, commandTerm);
} else if (command instanceof PrimaryReplicaChangeCommand) {
+ PrimaryReplicaChangeCommand cmd =
(PrimaryReplicaChangeCommand) command;
+ LOG.debug("Processing PrimaryReplicaChangeCommand
[groupId={}, commandIndex={}, commandTerm={}, "
+ + "leaseStartTime={}, primaryNodeId={},
primaryNodeName={}]",
+ partitionKey.toReplicationGroupId(), commandIndex,
commandTerm,
+ cmd.leaseStartTime(), cmd.primaryReplicaNodeId(),
cmd.primaryReplicaNodeName());
+
result = processCrossTableProcessorsCommand(command,
commandIndex, commandTerm, safeTimestamp);
- if
(updateLeaseInfoInTxStorage((PrimaryReplicaChangeCommand) command,
commandIndex, commandTerm)) {
+ if (updateLeaseInfoInTxStorage(cmd, commandIndex,
commandTerm)) {
+ LOG.debug("Updated lease info in tx storage
[groupId={}, commandIndex={}, leaseStartTime={}]",
+ partitionKey.toReplicationGroupId(),
commandIndex, cmd.leaseStartTime());
result = EMPTY_APPLIED_RESULT;
}
} else {
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java
index 13685a2319e..033c9ae2f54 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java
@@ -20,14 +20,15 @@ package org.apache.ignite.internal.replicator;
import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions.isRecoverable;
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
-import static
org.apache.ignite.internal.util.IgniteUtils.retryOperationUntilSuccess;
+import static
org.apache.ignite.internal.util.IgniteUtils.retryOperationUntilSuccessOrTimeout;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import org.apache.ignite.internal.hlc.ClockService;
@@ -49,6 +50,7 @@ import
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.TrackerClosedException;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -175,12 +177,7 @@ public class PlacementDriverMessageProcessor {
if (msg.force()) {
// Replica must wait till storage index reaches the
current leader's index to make sure that all updates made on the
// group leader are received.
- return waitForActualState(msg.leaseStartTime(),
msg.leaseExpirationTime().getPhysical())
- .thenCompose(v ->
sendPrimaryReplicaChangeToReplicationGroup(
- msg.leaseStartTime().longValue(),
- localNode.id(),
- localNode.name()
- ))
+ return waitForStateAndSendPrimaryReplicaChanged(msg)
.thenCompose(v -> {
CompletableFuture<LeaseGrantedMessageResponse> respFut =
acceptLease(msg.leaseStartTime(),
msg.leaseExpirationTime());
@@ -194,12 +191,7 @@ public class PlacementDriverMessageProcessor {
});
} else {
if (leader.equals(localNode)) {
- return waitForActualState(msg.leaseStartTime(),
msg.leaseExpirationTime().getPhysical())
- .thenCompose(v ->
sendPrimaryReplicaChangeToReplicationGroup(
- msg.leaseStartTime().longValue(),
- localNode.id(),
- localNode.name()
- ))
+ return
waitForStateAndSendPrimaryReplicaChanged(msg)
.thenCompose(v ->
acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime()));
} else {
return proposeLeaseRedirect(leader);
@@ -209,6 +201,24 @@ public class PlacementDriverMessageProcessor {
);
}
+ private @NotNull CompletableFuture<Void>
waitForStateAndSendPrimaryReplicaChanged(LeaseGrantedMessage msg) {
+ return waitForActualState(msg.leaseStartTime(),
msg.leaseExpirationTime().getPhysical())
+ .thenCompose(
+ v ->
sendPrimaryReplicaChangeToReplicationGroup(msg.leaseStartTime().longValue(),
+ localNode.id(), localNode.name()))
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ LOG.warn("Could not save lease information to the
replication group "
+ + "[leaseStartTime={},
candidateNodeId={}, candidateNodeName ={}].", e,
+ msg.leaseStartTime().longValue(),
localNode.id(), localNode.name());
+ } else {
+ LOG.info("The lease information was saved to the
replication group "
+ + "[leaseStartTime={},
candidateNodeId={}, candidateNodeName ={}].",
+ msg.leaseStartTime().longValue(),
localNode.id(), localNode.name());
+ }
+ });
+ }
+
private CompletableFuture<Void> sendPrimaryReplicaChangeToReplicationGroup(
long leaseStartTime,
UUID primaryReplicaNodeId,
@@ -259,18 +269,52 @@ public class PlacementDriverMessageProcessor {
* @return Future that is completed when local storage catches up the
index that is actual for leader on the moment of request.
*/
private CompletableFuture<Void> waitForActualState(HybridTimestamp
startTime, long expirationTime) {
- LOG.info("Waiting for actual storage state, group=" + groupId);
+ TimeTracker readIndexTimeTracker = new TimeTracker(
+ startTime,
+ expirationTime,
+ groupId,
+ "Timeout is expired before raft index reading started");
+
+ LOG.info("Waiting for actual storage state {}",
readIndexTimeTracker.timeMessageDetails());
replicaReservationClosure.accept(groupId, startTime);
- long timeout = expirationTime - currentTimeMillis();
- if (timeout <= 0) {
- return failedFuture(new TimeoutException());
+ if (readIndexTimeTracker.isExpired()) {
+ return readIndexTimeTracker.timeoutFailedFuture();
}
- return retryOperationUntilSuccess(raftClient::readIndex, e ->
currentTimeMillis() > expirationTime, executor)
- .orTimeout(timeout, TimeUnit.MILLISECONDS)
- .thenCompose(storageIndexTracker::waitFor);
+ return retryOperationUntilSuccessOrTimeout(raftClient::readIndex,
readIndexTimeTracker.remainingTimeoutMs(), executor)
+ .whenComplete((raftIndex, readIndexError) -> {
+ if (readIndexError != null) {
+ LOG.warn("Failed to read index from raft leader {}.",
+ readIndexError,
readIndexTimeTracker.timeMessageDetails());
+ } else {
+ LOG.debug("Successfully read index from raft leader
{}.", readIndexTimeTracker.timeMessageDetails());
+ }
+ })
+ .thenCompose(raftIndex -> {
+ // Recalculate remaining time after readIndex completes.
+ TimeTracker storageIndexUpdateTimeTracker = new
TimeTracker(
+ startTime,
+ expirationTime,
+ groupId,
+ "Timeout is expired before storage index tracking
started");
+ if (storageIndexUpdateTimeTracker.isExpired()) {
+ return
storageIndexUpdateTimeTracker.timeoutFailedFuture();
+ }
+
+ return storageIndexTracker.waitFor(raftIndex)
+
.orTimeout(storageIndexUpdateTimeTracker.remainingTimeoutMs(), MILLISECONDS)
+ .whenComplete((v, storageIndexTrackerError) -> {
+ if (storageIndexTrackerError != null) {
+ LOG.warn("Failed to wait for storage index
to reach raft leader {}.",
+ storageIndexTrackerError,
storageIndexUpdateTimeTracker.timeMessageDetails());
+ } else {
+ LOG.debug("Successfully waited for storage
index to reach raft leader {}.",
+
storageIndexUpdateTimeTracker.timeMessageDetails());
+ }
+ });
+ });
}
private void onLeaderElected(InternalClusterNode clusterNode, long term) {
@@ -281,4 +325,86 @@ public class PlacementDriverMessageProcessor {
private CompletableFuture<InternalClusterNode> leaderFuture() {
return leaderReadyFuture.thenApply(ignored -> leaderRef);
}
+
+ /**
+ * Tracks time for timeout operations. Calculates remaining time based on
expiration time and provides utilities
+ * for checking expiration and creating timeout exceptions with detailed
messages.
+ */
+ private static class TimeTracker {
+ /** Lease start time. */
+ private final HybridTimestamp leaseStartTime;
+ /** Expiration time in milliseconds. */
+ private final long expirationTime;
+
+ /** Replication group ID for logging purposes. */
+ private final ReplicationGroupId groupId;
+
+ /** Start time when this tracker was created. */
+ private final long startTime;
+
+ /** Base message for timeout exception. */
+ private final String message;
+
+ /**
+ * Creates a time tracker.
+ *
+ * @param expirationTime Expiration time in milliseconds.
+ * @param groupId Replication group ID for logging purposes.
+ * @param message Base message for timeout exception.
+ */
+ private TimeTracker(HybridTimestamp leaseStartTime, long
expirationTime, ReplicationGroupId groupId, String message) {
+ this.leaseStartTime = leaseStartTime;
+ this.expirationTime = expirationTime;
+ this.groupId = groupId;
+ this.startTime = currentTimeMillis();
+ this.message = message;
+ }
+
+ /**
+ * Checks if the timeout has expired.
+ *
+ * @return {@code true} if the timeout has expired (timeLeft < 0),
{@code false} otherwise.
+ */
+ public boolean isExpired() {
+ return remainingTimeoutMs() < 0;
+ }
+
+ /**
+ * Creates a failed future with a timeout exception that includes
detailed information.
+ *
+ * @return A failed future with {@link TimeoutException} containing
the base message and details.
+ */
+ public CompletableFuture<Void> timeoutFailedFuture() {
+ return failedFuture(new TimeoutException(format("{} {}.", message,
timeMessageDetails())));
+ }
+
+ /**
+ * Formats message details including group ID, expiration time, and
current time.
+ *
+ * @return Formatted message details.
+ */
+ public String timeMessageDetails() {
+ return format("[groupId={}, leaseStartTime={},
expirationTimeMs={}, remainingTimeoutMs={}, durationMs={}]",
+ groupId, leaseStartTime, expirationTime,
remainingTimeoutMs(), durationMs());
+ }
+
+ /**
+ * Returns the duration since this tracker was created.
+ *
+ * @return Duration in milliseconds since the tracker was created.
+ */
+ public long durationMs() {
+ return currentTimeMillis() - startTime;
+ }
+
+ /**
+ * Returns the remaining time until expiration in milliseconds.
+ *
+ * @return Remaining time in milliseconds. Can be negative if already
expired.
+ */
+ public long remainingTimeoutMs() {
+ return expirationTime - currentTimeMillis();
+ }
+ }
+
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
index 39dfff38cb9..429faadd92a 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR;
import java.util.UUID;
import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.tx.RetriableTransactionException;
import org.jetbrains.annotations.Nullable;
@@ -38,11 +39,17 @@ public class PrimaryReplicaMissException extends
IgniteInternalException impleme
* @param txId Transaction id.
* @param expectedEnlistmentConsistencyToken Expected enlistment
consistency token, {@code null} if absent.
* @param currentEnlistmentConsistencyToken Current enlistment consistency
token, {@code null} if absent.
+ * @param replicationGroupId Replication group id, {@code null} if absent.
*/
- public PrimaryReplicaMissException(UUID txId, Long
expectedEnlistmentConsistencyToken, Long currentEnlistmentConsistencyToken) {
+ public PrimaryReplicaMissException(
+ UUID txId,
+ Long expectedEnlistmentConsistencyToken,
+ Long currentEnlistmentConsistencyToken,
+ @Nullable ReplicationGroupId replicationGroupId
+ ) {
super(REPLICA_MISS_ERR, format("The primary replica has changed
[txId={}, expectedEnlistmentConsistencyToken={}, "
- + "currentEnlistmentConsistencyToken={}].", txId,
expectedEnlistmentConsistencyToken,
- currentEnlistmentConsistencyToken));
+ + "currentEnlistmentConsistencyToken={},
replicationGroupId={}].",
+ txId, expectedEnlistmentConsistencyToken,
currentEnlistmentConsistencyToken, replicationGroupId));
}
/**
@@ -54,6 +61,7 @@ public class PrimaryReplicaMissException extends
IgniteInternalException impleme
* @param currentLeaseholderId Current leaseholder id, {@code null} if
absent.
* @param expectedEnlistmentConsistencyToken Expected enlistment
consistency token, {@code null} if absent.
* @param currentEnlistmentConsistencyToken Current enlistment consistency
token, {@code null} if absent.
+ * @param replicationGroupId Replication group id, {@code null} if absent.
* @param cause Cause exception, {@code null} if absent.
*/
public PrimaryReplicaMissException(
@@ -63,20 +71,22 @@ public class PrimaryReplicaMissException extends
IgniteInternalException impleme
@Nullable UUID currentLeaseholderId,
@Nullable Long expectedEnlistmentConsistencyToken,
@Nullable Long currentEnlistmentConsistencyToken,
+ @Nullable ReplicationGroupId replicationGroupId,
@Nullable Throwable cause
) {
super(
REPLICA_MISS_ERR,
"The primary replica has changed "
+ "[expectedLeaseholderName={},
currentLeaseholderName={}, expectedLeaseholderId={}, currentLeaseholderId={},"
- + " expectedEnlistmentConsistencyToken={},
currentEnlistmentConsistencyToken={}]",
+ + " expectedEnlistmentConsistencyToken={},
currentEnlistmentConsistencyToken={}, replicationGroupId={}]",
cause,
expectedLeaseholderName,
currentLeaseholderName,
expectedLeaseholderId,
currentLeaseholderId,
expectedEnlistmentConsistencyToken,
- currentEnlistmentConsistencyToken
+ currentEnlistmentConsistencyToken,
+ replicationGroupId
);
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
index c9aa1ba3c82..f2d7471687c 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
@@ -228,7 +228,7 @@ public class QueryRecoveryTest extends
BaseIgniteAbstractTest {
cluster.setDataProvider("T1", TestBuilders.tableScan((nodeName,
partId) -> {
if (firstTimeThrow.compareAndSet(true, false)) {
reassignmentHappened.set(true);
- return () -> new FailingIterator<>(new
PrimaryReplicaMissException(UUID.randomUUID(), 0L, 0L));
+ return () -> new FailingIterator<>(new
PrimaryReplicaMissException(UUID.randomUUID(), 0L, 0L, null));
}
return Collections.singleton(new Object[]{partId, partId,
nodeName});
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
index a8e520e5a8e..8c5d2b091d0 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
@@ -37,6 +37,8 @@ import java.util.concurrent.Executor;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommandV2;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
@@ -73,6 +75,8 @@ import org.jetbrains.annotations.TestOnly;
* Partition command handler.
*/
public class TablePartitionProcessor implements RaftTableProcessor {
+ private static final IgniteLogger LOG =
Loggers.forClass(TablePartitionProcessor.class);
+
/** Transaction manager. */
private final TxManager txManager;
@@ -500,8 +504,17 @@ public class TablePartitionProcessor implements
RaftTableProcessor {
long commandIndex,
long commandTerm
) {
+ long storageLastAppliedIndex = storage.lastAppliedIndex();
+ LOG.debug("Handling PrimaryReplicaChangeCommand [tableId={},
partId={}, commandIndex={}, storageLastAppliedIndex={}, "
+ + "leaseStartTime={}, primaryNodeId={},
primaryNodeName={}]",
+ storage.tableId(), storage.partitionId(), commandIndex,
storageLastAppliedIndex,
+ cmd.leaseStartTime(), cmd.primaryReplicaNodeId(),
cmd.primaryReplicaNodeName());
+
// Skips the write command because the storage has already executed it.
- if (commandIndex <= storage.lastAppliedIndex()) {
+ if (commandIndex <= storageLastAppliedIndex) {
+ LOG.debug("Skipping PrimaryReplicaChangeCommand - already applied
[tableId={}, partId={}, commandIndex={}, "
+ + "storageLastAppliedIndex={}]",
+ storage.tableId(), storage.partitionId(), commandIndex,
storageLastAppliedIndex);
return EMPTY_NOT_APPLIED_RESULT;
}
@@ -515,6 +528,9 @@ public class TablePartitionProcessor implements
RaftTableProcessor {
return null;
});
+ LOG.debug("Successfully applied PrimaryReplicaChangeCommand
[tableId={}, partId={}, commandIndex={}, leaseStartTime={}]",
+ storage.tableId(), storage.partitionId(), commandIndex,
cmd.leaseStartTime());
+
return EMPTY_APPLIED_RESULT;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 9419bbc2434..0129f60fafc 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -572,7 +572,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
// We treat SCAN as 2pc and only switch to a 1pc mode if all table
rows fit in the bucket and the transaction is implicit.
// See `req.full() && (err != null || rows.size() <
req.batchSize())` condition.
// If they don't fit the bucket, the transaction is treated as 2pc.
- txManager.updateTxMeta(req.transactionId(), old -> builder(old,
PENDING)
+ txManager.updateTxMeta(req.transactionId(), old -> builder(old,
PENDING)
.txCoordinatorId(req.coordinatorId())
.commitPartitionId(req.commitPartitionId().asZonePartitionId())
.txLabel(req.txLabel())
@@ -2513,7 +2513,12 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
UpdateCommandResult updateCommandResult =
(UpdateCommandResult) res;
if (updateCommandResult != null &&
!updateCommandResult.isPrimaryReplicaMatch()) {
- throw new PrimaryReplicaMissException(txId,
cmd.leaseStartTime(), updateCommandResult.currentLeaseStartTime());
+ throw new PrimaryReplicaMissException(
+ txId,
+ cmd.leaseStartTime(),
+ updateCommandResult.currentLeaseStartTime(),
+ replicationGroupId
+ );
}
if (updateCommandResult != null &&
updateCommandResult.isPrimaryInPeersAndLearners()) {
@@ -2654,7 +2659,8 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
throw new PrimaryReplicaMissException(
cmd.txId(),
cmd.leaseStartTime(),
- updateCommandResult.currentLeaseStartTime()
+ updateCommandResult.currentLeaseStartTime(),
+ replicationGroupId
);
}
if (updateCommandResult.isPrimaryInPeersAndLearners()) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 520304f5be3..320c2193d73 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -413,7 +413,7 @@ public class InternalTableImpl implements InternalTable {
}
/**
- * Enlists a single row into a transaction.
+ * Enlists collection of rows into a transaction.
*
* @param keyRows Rows.
* @param tx The transaction.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateMetaRocksDbPartitionStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateMetaRocksDbPartitionStorage.java
index d94c4e7403c..b8d3bdd830e 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateMetaRocksDbPartitionStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateMetaRocksDbPartitionStorage.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.tx.storage.state.rocksdb;
+import static java.util.Optional.ofNullable;
import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
import static
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage.BYTE_ORDER;
import static
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbStorage.ZONE_ID_SIZE_BYTES;
@@ -166,6 +167,10 @@ class TxStateMetaRocksDbPartitionStorage {
}
void updateLease(WriteBatch writeBatch, LeaseInfo leaseInfo) throws
RocksDBException {
+ long currentLeaseStartTime =
ofNullable(this.leaseInfo).map(LeaseInfo::leaseStartTime).orElse(Long.MIN_VALUE);
+ if (leaseInfo.leaseStartTime() <= currentLeaseStartTime) {
+ return;
+ }
columnFamily.put(writeBatch, leaseInfoKey,
VersionedSerialization.toBytes(leaseInfo, LeaseInfoSerializer.INSTANCE));
this.leaseInfo = leaseInfo;
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 82808c30793..12d2bb28048 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -599,6 +599,7 @@ public class TxManagerTest extends IgniteAbstractTest {
null,
10L,
null,
+ null,
null
)),
completedFuture(new
TransactionResult(TxState.COMMITTED, commitTimestamp))