denis-chudov commented on code in PR #7379:
URL: https://github.com/apache/ignite-3/pull/7379#discussion_r2695797213
##########
modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java:
##########
@@ -217,4 +224,65 @@ void testByteBufferToByteArray() {
ByteBuffer smallDirectBuffer =
bigDirectBuffer.position(1).limit(4).slice();
assertArrayEquals(new byte[] {1, 2, 3},
byteBufferToByteArray(smallDirectBuffer));
}
+
+ @Test
+ @Timeout(value = 1, unit = SECONDS)
+ void testRetryOperationUntilSuccessOrTimeout_SuccessOnFirstAttempt() {
+ Executor executor = Executors.newSingleThreadExecutor();
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ CompletableFuture<String> result = retryOperationUntilSuccessOrTimeout(
+ () -> {
+ callCount.incrementAndGet();
+ return completedFuture("success");
+ },
+ 1000,
+ executor
+ );
+
+ assertThat(result, willBe(equalTo("success")));
+ assertThat(callCount.get(), equalTo(1));
+ }
+
+ @Test
+ @Timeout(value = 1, unit = SECONDS)
+ void testRetryOperationUntilSuccessOrTimeout_SuccessAfterRetries() throws
Exception {
+ Executor executor = Executors.newSingleThreadExecutor();
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ CompletableFuture<String> result = retryOperationUntilSuccessOrTimeout(
+ () -> {
+ int count = callCount.incrementAndGet();
+ if (count < 3) {
+ return failedFuture(new IOException("Temporary
failure"));
+ }
+ return completedFuture("success");
+ },
+ 1000,
+ executor
+ );
+
+ assertThat(result, willBe(equalTo("success")));
+ assertThat(callCount.get(), equalTo(3));
+ }
+
+ @Test
+ @Timeout(value = 1, unit = SECONDS)
+ void testRetryOperationUntilSuccessOrTimeout_Timeout() throws Exception {
+ Executor executor = Executors.newSingleThreadExecutor();
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ CompletableFuture<String> result = retryOperationUntilSuccessOrTimeout(
+ () -> {
+ callCount.incrementAndGet();
+ return failedFuture(new IOException("Persistent failure"));
+ },
+ 100,
Review Comment:
let's also add a test setting `orTimeout` externally.
##########
modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java:
##########
@@ -1232,6 +1232,51 @@ public static <T> void retryOperationUntilSuccess(
});
}
+ /**
+ * Retries operation until it succeeds or timeout occurs.
+ *
+ * @param operation Operation.
+ * @param timeout Timeout in milliseconds.
+ * @param executor Executor to make retry in.
+ * @return Future that is completed when operation is successful or failed
with other exception than the given.
+ */
+ public static <T> CompletableFuture<T> retryOperationUntilSuccessOrTimeout(
Review Comment:
Why did you add new methods instead of just changing old ones?
Old ones have no other usages.
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java:
##########
@@ -209,6 +205,23 @@ private CompletableFuture<LeaseGrantedMessageResponse>
processLeaseGrantedMessag
);
}
+ private @NotNull CompletableFuture<Void>
getWaitForStateAndSendPrimaryReplicaChanged(LeaseGrantedMessage msg) {
+ return waitForActualState(msg.leaseStartTime(),
msg.leaseExpirationTime().getPhysical()).thenCompose(
+ v ->
sendPrimaryReplicaChangeToReplicationGroup(msg.leaseStartTime().longValue(),
+ localNode.id(), localNode.name()))
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ LOG.warn("Could not send primary replica changed "
+ + "[leaseStartTime={},
primaryReplicaNodeId={}, primaryReplicaNodeName ={}].", e,
Review Comment:
```suggestion
+ "[leaseStartTime={},
candidateNodeId={}, candidateNodeName={}].", e,
```
Let's name it candidate node, there is no primary replica here yet.
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java:
##########
@@ -259,18 +272,137 @@ private CompletableFuture<LeaseGrantedMessageResponse>
proposeLeaseRedirect(Inte
* @return Future that is completed when local storage catches up the
index that is actual for leader on the moment of request.
*/
private CompletableFuture<Void> waitForActualState(HybridTimestamp
startTime, long expirationTime) {
- LOG.info("Waiting for actual storage state, group=" + groupId);
+ LOG.info("Waiting for actual storage state [groupId={},
expirationTime={}, timeoutMs={}, leaseStartTime={}]",
+ groupId, expirationTime, expirationTime -
currentTimeMillis(), startTime);
replicaReservationClosure.accept(groupId, startTime);
- long timeout = expirationTime - currentTimeMillis();
- if (timeout <= 0) {
- return failedFuture(new TimeoutException());
+ TimeTracker readIndexTimeTracker = new TimeTracker(
+ expirationTime,
+ groupId,
+ "Timeout is expired before raft index reading started");
+ if (readIndexTimeTracker.isExpired()) {
+ return readIndexTimeTracker.getFailedFuture();
+ }
+
+ return retryOperationUntilSuccessOrTimeout(raftClient::readIndex,
readIndexTimeTracker.getTimeoutMs(), executor)
+ .whenComplete((raftIndex, readIndexError) -> {
+ if (readIndexError != null) {
+ LOG.warn("Failed to read index from raft leader"
+ + " [groupId={}, , expirationTime={},
timeoutMs={}, durationMs={}]", readIndexError,
+ groupId, expirationTime,
readIndexTimeTracker.getTimeoutMs(), readIndexTimeTracker.getDurationMs());
+ } else {
+ LOG.info("Successfully read index from raft leader "
+ + "[groupId={}, expirationTime={},
timeoutMs={}, durationMs={}]",
+ groupId, expirationTime,
readIndexTimeTracker.getTimeoutMs(), readIndexTimeTracker.getDurationMs());
+ }
+ })
+ .thenCompose(raftIndex -> {
+ // Recalculate remaining time after readIndex completes.
+ TimeTracker storageIndexUpdateTimeTracker = new
TimeTracker(
+ expirationTime,
+ groupId,
+ "Timeout is expired before storage index tracking
started");
+ if (storageIndexUpdateTimeTracker.isExpired()) {
+ return storageIndexUpdateTimeTracker.getFailedFuture();
+ }
+
+ return storageIndexTracker.waitFor(raftIndex)
+
.orTimeout(storageIndexUpdateTimeTracker.getTimeoutMs(), MILLISECONDS)
+ .whenComplete((v, storageIndexTrackerError) -> {
+ if (storageIndexTrackerError != null) {
+ LOG.warn("Failed to wait for storage index
to reach raft leader"
+ + " [groupId={},
expirationTime={}, timeoutMs={}, durationMs={}]",
+ storageIndexTrackerError, groupId,
expirationTime,
+
storageIndexUpdateTimeTracker.getTimeoutMs(),
+
storageIndexUpdateTimeTracker.getDurationMs());
+ } else {
+ LOG.info("Successfully waited for storage
index to reach raft leader"
+ + " [groupId={},
expirationTime={}, timeoutMs={}, durationMs={}]",
+ groupId, expirationTime,
+
storageIndexUpdateTimeTracker.getTimeoutMs(),
+
storageIndexUpdateTimeTracker.getDurationMs());
+ }
+ });
+ });
+ }
+
+ /**
+ * Tracks time for timeout operations. Calculates remaining time based on
expiration time and provides utilities
+ * for checking expiration and creating timeout exceptions with detailed
messages.
+ */
+ private static class TimeTracker {
+ /** Expiration time in milliseconds. */
+ private final long expirationTime;
+
+ /** Replication group ID for logging purposes. */
+ private final ReplicationGroupId groupId;
+
+ /** Start time when this tracker was created. */
+ private final long startTime;
+
+ /** Base message for timeout exception. */
+ private final String message;
+
+ /**
+ * Creates a time tracker.
+ *
+ * @param expirationTime Expiration time in milliseconds.
+ * @param groupId Replication group ID for logging purposes.
+ * @param message Base message for timeout exception.
+ */
+ private TimeTracker(long expirationTime, ReplicationGroupId groupId,
String message) {
+ this.expirationTime = expirationTime;
+ this.groupId = groupId;
+ this.startTime = currentTimeMillis();
+ this.message = message;
+ }
+
+ /**
+ * Checks if the timeout has expired.
+ *
+ * @return {@code true} if the timeout has expired (timeLeft < 0),
{@code false} otherwise.
+ */
+ public boolean isExpired() {
+ return getTimeoutMs() < 0;
+ }
+
+ /**
+ * Creates a failed future with a timeout exception that includes
detailed information.
+ *
+ * @return A failed future with {@link TimeoutException} containing
the base message and details.
+ */
+ public CompletableFuture<Void> getFailedFuture() {
+ return failedFuture(new TimeoutException(message +
getMessageDetails()));
+ }
+
+ /**
+ * Formats message details including group ID, expiration time, and
current time.
+ *
+ * @return Formatted message details.
+ */
+ private String getMessageDetails() {
+ return format(" [groupId={}, expirationTime={}, timeoutMs={},
durationMs={}].",
+ groupId, expirationTime, getTimeoutMs(), getDurationMs());
}
- return retryOperationUntilSuccess(raftClient::readIndex, e ->
currentTimeMillis() > expirationTime, executor)
- .orTimeout(timeout, TimeUnit.MILLISECONDS)
- .thenCompose(storageIndexTracker::waitFor);
+ /**
+ * Returns the duration since this tracker was created.
+ *
+ * @return Duration in milliseconds since the tracker was created.
+ */
+ public long getDurationMs() {
+ return currentTimeMillis() - startTime;
+ }
+
+ /**
+ * Returns the remaining time until expiration in milliseconds.
+ *
+ * @return Remaining time in milliseconds. Can be negative if already
expired.
+ */
+ public long getTimeoutMs() {
+ return expirationTime - currentTimeMillis();
+ }
Review Comment:
Could you please name getters without leading `get`?
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java:
##########
@@ -259,18 +272,137 @@ private CompletableFuture<LeaseGrantedMessageResponse>
proposeLeaseRedirect(Inte
* @return Future that is completed when local storage catches up the
index that is actual for leader on the moment of request.
*/
private CompletableFuture<Void> waitForActualState(HybridTimestamp
startTime, long expirationTime) {
- LOG.info("Waiting for actual storage state, group=" + groupId);
+ LOG.info("Waiting for actual storage state [groupId={},
expirationTime={}, timeoutMs={}, leaseStartTime={}]",
+ groupId, expirationTime, expirationTime -
currentTimeMillis(), startTime);
replicaReservationClosure.accept(groupId, startTime);
- long timeout = expirationTime - currentTimeMillis();
- if (timeout <= 0) {
- return failedFuture(new TimeoutException());
+ TimeTracker readIndexTimeTracker = new TimeTracker(
+ expirationTime,
+ groupId,
+ "Timeout is expired before raft index reading started");
+ if (readIndexTimeTracker.isExpired()) {
+ return readIndexTimeTracker.getFailedFuture();
+ }
+
+ return retryOperationUntilSuccessOrTimeout(raftClient::readIndex,
readIndexTimeTracker.getTimeoutMs(), executor)
+ .whenComplete((raftIndex, readIndexError) -> {
+ if (readIndexError != null) {
+ LOG.warn("Failed to read index from raft leader"
+ + " [groupId={}, , expirationTime={},
timeoutMs={}, durationMs={}]", readIndexError,
+ groupId, expirationTime,
readIndexTimeTracker.getTimeoutMs(), readIndexTimeTracker.getDurationMs());
+ } else {
+ LOG.info("Successfully read index from raft leader "
+ + "[groupId={}, expirationTime={},
timeoutMs={}, durationMs={}]",
Review Comment:
```suggestion
+ "[groupId={}, expirationTime={},
timeoutMs={}, durationMs={}]",
```
Let's also add start time as the thing that uniquely identifies the lease
(with group id).
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java:
##########
@@ -259,18 +272,137 @@ private CompletableFuture<LeaseGrantedMessageResponse>
proposeLeaseRedirect(Inte
* @return Future that is completed when local storage catches up the
index that is actual for leader on the moment of request.
*/
private CompletableFuture<Void> waitForActualState(HybridTimestamp
startTime, long expirationTime) {
- LOG.info("Waiting for actual storage state, group=" + groupId);
+ LOG.info("Waiting for actual storage state [groupId={},
expirationTime={}, timeoutMs={}, leaseStartTime={}]",
+ groupId, expirationTime, expirationTime -
currentTimeMillis(), startTime);
replicaReservationClosure.accept(groupId, startTime);
- long timeout = expirationTime - currentTimeMillis();
- if (timeout <= 0) {
- return failedFuture(new TimeoutException());
+ TimeTracker readIndexTimeTracker = new TimeTracker(
+ expirationTime,
+ groupId,
+ "Timeout is expired before raft index reading started");
+ if (readIndexTimeTracker.isExpired()) {
+ return readIndexTimeTracker.getFailedFuture();
+ }
+
+ return retryOperationUntilSuccessOrTimeout(raftClient::readIndex,
readIndexTimeTracker.getTimeoutMs(), executor)
+ .whenComplete((raftIndex, readIndexError) -> {
+ if (readIndexError != null) {
+ LOG.warn("Failed to read index from raft leader"
+ + " [groupId={}, , expirationTime={},
timeoutMs={}, durationMs={}]", readIndexError,
+ groupId, expirationTime,
readIndexTimeTracker.getTimeoutMs(), readIndexTimeTracker.getDurationMs());
+ } else {
+ LOG.info("Successfully read index from raft leader "
+ + "[groupId={}, expirationTime={},
timeoutMs={}, durationMs={}]",
+ groupId, expirationTime,
readIndexTimeTracker.getTimeoutMs(), readIndexTimeTracker.getDurationMs());
+ }
+ })
+ .thenCompose(raftIndex -> {
+ // Recalculate remaining time after readIndex completes.
+ TimeTracker storageIndexUpdateTimeTracker = new
TimeTracker(
+ expirationTime,
+ groupId,
+ "Timeout is expired before storage index tracking
started");
+ if (storageIndexUpdateTimeTracker.isExpired()) {
+ return storageIndexUpdateTimeTracker.getFailedFuture();
+ }
+
+ return storageIndexTracker.waitFor(raftIndex)
+
.orTimeout(storageIndexUpdateTimeTracker.getTimeoutMs(), MILLISECONDS)
+ .whenComplete((v, storageIndexTrackerError) -> {
+ if (storageIndexTrackerError != null) {
+ LOG.warn("Failed to wait for storage index
to reach raft leader"
+ + " [groupId={},
expirationTime={}, timeoutMs={}, durationMs={}]",
+ storageIndexTrackerError, groupId,
expirationTime,
+
storageIndexUpdateTimeTracker.getTimeoutMs(),
+
storageIndexUpdateTimeTracker.getDurationMs());
+ } else {
+ LOG.info("Successfully waited for storage
index to reach raft leader"
Review Comment:
```suggestion
LOG.debug("Successfully waited for
storage index to reach raft leader"
```
##########
modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java:
##########
@@ -217,4 +224,65 @@ void testByteBufferToByteArray() {
ByteBuffer smallDirectBuffer =
bigDirectBuffer.position(1).limit(4).slice();
assertArrayEquals(new byte[] {1, 2, 3},
byteBufferToByteArray(smallDirectBuffer));
}
+
+ @Test
+ @Timeout(value = 1, unit = SECONDS)
+ void testRetryOperationUntilSuccessOrTimeout_SuccessOnFirstAttempt() {
+ Executor executor = Executors.newSingleThreadExecutor();
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ CompletableFuture<String> result = retryOperationUntilSuccessOrTimeout(
+ () -> {
+ callCount.incrementAndGet();
+ return completedFuture("success");
+ },
+ 1000,
+ executor
+ );
+
+ assertThat(result, willBe(equalTo("success")));
+ assertThat(callCount.get(), equalTo(1));
+ }
+
+ @Test
+ @Timeout(value = 1, unit = SECONDS)
+ void testRetryOperationUntilSuccessOrTimeout_SuccessAfterRetries() throws
Exception {
+ Executor executor = Executors.newSingleThreadExecutor();
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ CompletableFuture<String> result = retryOperationUntilSuccessOrTimeout(
+ () -> {
+ int count = callCount.incrementAndGet();
+ if (count < 3) {
+ return failedFuture(new IOException("Temporary
failure"));
+ }
+ return completedFuture("success");
+ },
+ 1000,
+ executor
+ );
+
+ assertThat(result, willBe(equalTo("success")));
+ assertThat(callCount.get(), equalTo(3));
+ }
+
+ @Test
+ @Timeout(value = 1, unit = SECONDS)
Review Comment:
looks too strict, test may fail with timeout on any jvm pause greater than
just 1 second
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java:
##########
@@ -172,15 +175,13 @@ private CompletableFuture<LeaseGrantedMessageResponse>
processLeaseGrantedMessag
assert leaseExpirationTime == null ||
clockService.after(msg.leaseExpirationTime(), leaseExpirationTime)
: "Invalid lease expiration time in message, msg="
+ msg;
+ Supplier<CompletableFuture<Void>>
waitForStateAndSendPrimaryReplicaChanged =
Review Comment:
Why is it needed?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java:
##########
@@ -516,6 +529,9 @@ private CommandResult handlePrimaryReplicaChangeCommand(
return null;
});
+ LOG.info("Successfully applied PrimaryReplicaChangeCommand
[tableId={}, partId={}, commandIndex={}, leaseStartTime={}]",
+ storage.tableId(), storage.partitionId(), commandIndex,
cmd.leaseStartTime());
Review Comment:
Also, I would suggest to change this to `debug`. BTW, `warn` is not needed
in such situations, we usually consider it as "something that requires
reaction" - but skipping the command due to lesser index definitely doesn't
require reaction
##########
modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java:
##########
@@ -1232,6 +1232,51 @@ public static <T> void retryOperationUntilSuccess(
});
}
+ /**
+ * Retries operation until it succeeds or timeout occurs.
+ *
+ * @param operation Operation.
+ * @param timeout Timeout in milliseconds.
+ * @param executor Executor to make retry in.
+ * @return Future that is completed when operation is successful or failed
with other exception than the given.
+ */
+ public static <T> CompletableFuture<T> retryOperationUntilSuccessOrTimeout(
+ Supplier<CompletableFuture<T>> operation,
+ long timeout,
Review Comment:
In our discussion, I meant that timeout may be given by user with
`.orTimeout` on returned future, in addition to `stopRetryCondition`. But now,
I don't know, may be mandatory parameter is better?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -210,9 +210,17 @@ private void
processWriteCommand(CommandClosure<WriteCommand> clo) {
} else if (command instanceof SafeTimeSyncCommand) {
result = handleSafeTimeSyncCommand((SafeTimeSyncCommand)
command, commandIndex, commandTerm);
} else if (command instanceof PrimaryReplicaChangeCommand) {
+ PrimaryReplicaChangeCommand cmd =
(PrimaryReplicaChangeCommand) command;
+ LOG.info("Processing PrimaryReplicaChangeCommand
[groupId={}, commandIndex={}, commandTerm={}, "
+ + "leaseStartTime={}, primaryNodeId={},
primaryNodeName={}]",
+ partitionKey.toReplicationGroupId(), commandIndex,
commandTerm,
+ cmd.leaseStartTime(), cmd.primaryReplicaNodeId(),
cmd.primaryReplicaNodeName());
+
result = processCrossTableProcessorsCommand(command,
commandIndex, commandTerm, safeTimestamp);
- if
(updateLeaseInfoInTxStorage((PrimaryReplicaChangeCommand) command,
commandIndex, commandTerm)) {
+ if (updateLeaseInfoInTxStorage(cmd, commandIndex,
commandTerm)) {
Review Comment:
The result of `updateLeaseInfoInTxStorage` doesn't depend on
`txStateStorage.leaseInfo` - only on index and term.
Also, I see that
`TxStateRocksDbPartitionStorage#leaseInfo(org.apache.ignite.internal.storage.lease.LeaseInfo,
long, long)` updates lease unconditionally without awareness of lease start
time, unlike `MvPartitionStorage#updateLease`. This seems to be a problem, at
least in can lead to inconsistency with mv partition storage.
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -210,9 +210,17 @@ private void
processWriteCommand(CommandClosure<WriteCommand> clo) {
} else if (command instanceof SafeTimeSyncCommand) {
result = handleSafeTimeSyncCommand((SafeTimeSyncCommand)
command, commandIndex, commandTerm);
} else if (command instanceof PrimaryReplicaChangeCommand) {
+ PrimaryReplicaChangeCommand cmd =
(PrimaryReplicaChangeCommand) command;
+ LOG.info("Processing PrimaryReplicaChangeCommand
[groupId={}, commandIndex={}, commandTerm={}, "
+ + "leaseStartTime={}, primaryNodeId={},
primaryNodeName={}]",
+ partitionKey.toReplicationGroupId(), commandIndex,
commandTerm,
+ cmd.leaseStartTime(), cmd.primaryReplicaNodeId(),
cmd.primaryReplicaNodeName());
+
Review Comment:
I'm not sure we need the log as `info`. I would suggest to change this and
next one to `debug`. We have `lease grant message received` and `lease
accepted` as `info`, every exception in-between should be info or warn, seems
that is enough, WDYT?
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java:
##########
@@ -209,6 +205,23 @@ private CompletableFuture<LeaseGrantedMessageResponse>
processLeaseGrantedMessag
);
}
+ private @NotNull CompletableFuture<Void>
getWaitForStateAndSendPrimaryReplicaChanged(LeaseGrantedMessage msg) {
Review Comment:
Also, please remove the leading `get`
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java:
##########
@@ -209,6 +205,23 @@ private CompletableFuture<LeaseGrantedMessageResponse>
processLeaseGrantedMessag
);
}
+ private @NotNull CompletableFuture<Void>
getWaitForStateAndSendPrimaryReplicaChanged(LeaseGrantedMessage msg) {
+ return waitForActualState(msg.leaseStartTime(),
msg.leaseExpirationTime().getPhysical()).thenCompose(
+ v ->
sendPrimaryReplicaChangeToReplicationGroup(msg.leaseStartTime().longValue(),
+ localNode.id(), localNode.name()))
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ LOG.warn("Could not send primary replica changed "
Review Comment:
```suggestion
LOG.warn("Could not save lease information to
replication group "
```
Just cosmetics, but explains better the essence of what's happening, as for
me
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java:
##########
@@ -209,6 +205,23 @@ private CompletableFuture<LeaseGrantedMessageResponse>
processLeaseGrantedMessag
);
}
+ private @NotNull CompletableFuture<Void>
getWaitForStateAndSendPrimaryReplicaChanged(LeaseGrantedMessage msg) {
+ return waitForActualState(msg.leaseStartTime(),
msg.leaseExpirationTime().getPhysical()).thenCompose(
+ v ->
sendPrimaryReplicaChangeToReplicationGroup(msg.leaseStartTime().longValue(),
+ localNode.id(), localNode.name()))
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ LOG.warn("Could not send primary replica changed "
+ + "[leaseStartTime={},
primaryReplicaNodeId={}, primaryReplicaNodeName ={}].", e,
+ msg.leaseStartTime().longValue(),
localNode.id(), localNode.name());
+ } else {
+ LOG.info("Primary Replica changed was sent"
+ + "[leaseStartTime={},
primaryReplicaNodeId={}, primaryReplicaNodeName ={}].",
Review Comment:
same as above
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java:
##########
@@ -172,15 +175,13 @@ private CompletableFuture<LeaseGrantedMessageResponse>
processLeaseGrantedMessag
assert leaseExpirationTime == null ||
clockService.after(msg.leaseExpirationTime(), leaseExpirationTime)
: "Invalid lease expiration time in message, msg="
+ msg;
+ Supplier<CompletableFuture<Void>>
waitForStateAndSendPrimaryReplicaChanged =
+ () ->
getWaitForStateAndSendPrimaryReplicaChanged(msg);
+
if (msg.force()) {
// Replica must wait till storage index reaches the
current leader's index to make sure that all updates made on the
// group leader are received.
- return waitForActualState(msg.leaseStartTime(),
msg.leaseExpirationTime().getPhysical())
- .thenCompose(v ->
sendPrimaryReplicaChangeToReplicationGroup(
- msg.leaseStartTime().longValue(),
- localNode.id(),
- localNode.name()
- ))
+ return waitForStateAndSendPrimaryReplicaChanged.get()
Review Comment:
```suggestion
return
getWaitForStateAndSendPrimaryReplicaChanged(msg)
```
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java:
##########
@@ -259,18 +272,137 @@ private CompletableFuture<LeaseGrantedMessageResponse>
proposeLeaseRedirect(Inte
* @return Future that is completed when local storage catches up the
index that is actual for leader on the moment of request.
*/
private CompletableFuture<Void> waitForActualState(HybridTimestamp
startTime, long expirationTime) {
- LOG.info("Waiting for actual storage state, group=" + groupId);
+ LOG.info("Waiting for actual storage state [groupId={},
expirationTime={}, timeoutMs={}, leaseStartTime={}]",
Review Comment:
let's put start time before expiration time
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java:
##########
@@ -209,6 +205,23 @@ private CompletableFuture<LeaseGrantedMessageResponse>
processLeaseGrantedMessag
);
}
+ private @NotNull CompletableFuture<Void>
getWaitForStateAndSendPrimaryReplicaChanged(LeaseGrantedMessage msg) {
+ return waitForActualState(msg.leaseStartTime(),
msg.leaseExpirationTime().getPhysical()).thenCompose(
Review Comment:
Could you place `thenCompose` on the same indentation level as
`whenComplete`, for the sake of readability?
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java:
##########
@@ -259,18 +272,137 @@ private CompletableFuture<LeaseGrantedMessageResponse>
proposeLeaseRedirect(Inte
* @return Future that is completed when local storage catches up the
index that is actual for leader on the moment of request.
*/
private CompletableFuture<Void> waitForActualState(HybridTimestamp
startTime, long expirationTime) {
- LOG.info("Waiting for actual storage state, group=" + groupId);
+ LOG.info("Waiting for actual storage state [groupId={},
expirationTime={}, timeoutMs={}, leaseStartTime={}]",
+ groupId, expirationTime, expirationTime -
currentTimeMillis(), startTime);
replicaReservationClosure.accept(groupId, startTime);
- long timeout = expirationTime - currentTimeMillis();
- if (timeout <= 0) {
- return failedFuture(new TimeoutException());
+ TimeTracker readIndexTimeTracker = new TimeTracker(
+ expirationTime,
+ groupId,
+ "Timeout is expired before raft index reading started");
+ if (readIndexTimeTracker.isExpired()) {
+ return readIndexTimeTracker.getFailedFuture();
+ }
+
+ return retryOperationUntilSuccessOrTimeout(raftClient::readIndex,
readIndexTimeTracker.getTimeoutMs(), executor)
+ .whenComplete((raftIndex, readIndexError) -> {
+ if (readIndexError != null) {
+ LOG.warn("Failed to read index from raft leader"
+ + " [groupId={}, , expirationTime={},
timeoutMs={}, durationMs={}]", readIndexError,
Review Comment:
```suggestion
+ " [groupId={}, expirationTime={},
timeoutMs={}, durationMs={}]", readIndexError,
```
Let's also add start time as the thing that uniquely identifies the lease
(with group id).
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java:
##########
@@ -259,18 +272,137 @@ private CompletableFuture<LeaseGrantedMessageResponse>
proposeLeaseRedirect(Inte
* @return Future that is completed when local storage catches up the
index that is actual for leader on the moment of request.
*/
private CompletableFuture<Void> waitForActualState(HybridTimestamp
startTime, long expirationTime) {
- LOG.info("Waiting for actual storage state, group=" + groupId);
+ LOG.info("Waiting for actual storage state [groupId={},
expirationTime={}, timeoutMs={}, leaseStartTime={}]",
+ groupId, expirationTime, expirationTime -
currentTimeMillis(), startTime);
replicaReservationClosure.accept(groupId, startTime);
- long timeout = expirationTime - currentTimeMillis();
- if (timeout <= 0) {
- return failedFuture(new TimeoutException());
+ TimeTracker readIndexTimeTracker = new TimeTracker(
+ expirationTime,
+ groupId,
+ "Timeout is expired before raft index reading started");
+ if (readIndexTimeTracker.isExpired()) {
+ return readIndexTimeTracker.getFailedFuture();
+ }
+
+ return retryOperationUntilSuccessOrTimeout(raftClient::readIndex,
readIndexTimeTracker.getTimeoutMs(), executor)
+ .whenComplete((raftIndex, readIndexError) -> {
+ if (readIndexError != null) {
+ LOG.warn("Failed to read index from raft leader"
+ + " [groupId={}, , expirationTime={},
timeoutMs={}, durationMs={}]", readIndexError,
+ groupId, expirationTime,
readIndexTimeTracker.getTimeoutMs(), readIndexTimeTracker.getDurationMs());
+ } else {
+ LOG.info("Successfully read index from raft leader "
Review Comment:
```suggestion
LOG.debug("Successfully read index from raft leader "
```
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java:
##########
@@ -259,18 +272,137 @@ private CompletableFuture<LeaseGrantedMessageResponse>
proposeLeaseRedirect(Inte
* @return Future that is completed when local storage catches up the
index that is actual for leader on the moment of request.
*/
private CompletableFuture<Void> waitForActualState(HybridTimestamp
startTime, long expirationTime) {
- LOG.info("Waiting for actual storage state, group=" + groupId);
+ LOG.info("Waiting for actual storage state [groupId={},
expirationTime={}, timeoutMs={}, leaseStartTime={}]",
+ groupId, expirationTime, expirationTime -
currentTimeMillis(), startTime);
replicaReservationClosure.accept(groupId, startTime);
- long timeout = expirationTime - currentTimeMillis();
- if (timeout <= 0) {
- return failedFuture(new TimeoutException());
+ TimeTracker readIndexTimeTracker = new TimeTracker(
+ expirationTime,
+ groupId,
+ "Timeout is expired before raft index reading started");
+ if (readIndexTimeTracker.isExpired()) {
+ return readIndexTimeTracker.getFailedFuture();
+ }
+
+ return retryOperationUntilSuccessOrTimeout(raftClient::readIndex,
readIndexTimeTracker.getTimeoutMs(), executor)
+ .whenComplete((raftIndex, readIndexError) -> {
+ if (readIndexError != null) {
+ LOG.warn("Failed to read index from raft leader"
+ + " [groupId={}, , expirationTime={},
timeoutMs={}, durationMs={}]", readIndexError,
+ groupId, expirationTime,
readIndexTimeTracker.getTimeoutMs(), readIndexTimeTracker.getDurationMs());
+ } else {
+ LOG.info("Successfully read index from raft leader "
+ + "[groupId={}, expirationTime={},
timeoutMs={}, durationMs={}]",
+ groupId, expirationTime,
readIndexTimeTracker.getTimeoutMs(), readIndexTimeTracker.getDurationMs());
+ }
+ })
+ .thenCompose(raftIndex -> {
+ // Recalculate remaining time after readIndex completes.
+ TimeTracker storageIndexUpdateTimeTracker = new
TimeTracker(
+ expirationTime,
+ groupId,
+ "Timeout is expired before storage index tracking
started");
+ if (storageIndexUpdateTimeTracker.isExpired()) {
+ return storageIndexUpdateTimeTracker.getFailedFuture();
+ }
+
+ return storageIndexTracker.waitFor(raftIndex)
+
.orTimeout(storageIndexUpdateTimeTracker.getTimeoutMs(), MILLISECONDS)
+ .whenComplete((v, storageIndexTrackerError) -> {
+ if (storageIndexTrackerError != null) {
+ LOG.warn("Failed to wait for storage index
to reach raft leader"
+ + " [groupId={},
expirationTime={}, timeoutMs={}, durationMs={}]",
Review Comment:
pls add start time
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java:
##########
@@ -259,18 +272,137 @@ private CompletableFuture<LeaseGrantedMessageResponse>
proposeLeaseRedirect(Inte
* @return Future that is completed when local storage catches up the
index that is actual for leader on the moment of request.
*/
private CompletableFuture<Void> waitForActualState(HybridTimestamp
startTime, long expirationTime) {
- LOG.info("Waiting for actual storage state, group=" + groupId);
+ LOG.info("Waiting for actual storage state [groupId={},
expirationTime={}, timeoutMs={}, leaseStartTime={}]",
+ groupId, expirationTime, expirationTime -
currentTimeMillis(), startTime);
replicaReservationClosure.accept(groupId, startTime);
- long timeout = expirationTime - currentTimeMillis();
- if (timeout <= 0) {
- return failedFuture(new TimeoutException());
+ TimeTracker readIndexTimeTracker = new TimeTracker(
+ expirationTime,
+ groupId,
+ "Timeout is expired before raft index reading started");
+ if (readIndexTimeTracker.isExpired()) {
+ return readIndexTimeTracker.getFailedFuture();
+ }
+
+ return retryOperationUntilSuccessOrTimeout(raftClient::readIndex,
readIndexTimeTracker.getTimeoutMs(), executor)
+ .whenComplete((raftIndex, readIndexError) -> {
+ if (readIndexError != null) {
+ LOG.warn("Failed to read index from raft leader"
+ + " [groupId={}, , expirationTime={},
timeoutMs={}, durationMs={}]", readIndexError,
+ groupId, expirationTime,
readIndexTimeTracker.getTimeoutMs(), readIndexTimeTracker.getDurationMs());
+ } else {
+ LOG.info("Successfully read index from raft leader "
+ + "[groupId={}, expirationTime={},
timeoutMs={}, durationMs={}]",
+ groupId, expirationTime,
readIndexTimeTracker.getTimeoutMs(), readIndexTimeTracker.getDurationMs());
+ }
+ })
+ .thenCompose(raftIndex -> {
+ // Recalculate remaining time after readIndex completes.
+ TimeTracker storageIndexUpdateTimeTracker = new
TimeTracker(
+ expirationTime,
+ groupId,
+ "Timeout is expired before storage index tracking
started");
+ if (storageIndexUpdateTimeTracker.isExpired()) {
+ return storageIndexUpdateTimeTracker.getFailedFuture();
+ }
+
+ return storageIndexTracker.waitFor(raftIndex)
+
.orTimeout(storageIndexUpdateTimeTracker.getTimeoutMs(), MILLISECONDS)
+ .whenComplete((v, storageIndexTrackerError) -> {
+ if (storageIndexTrackerError != null) {
+ LOG.warn("Failed to wait for storage index
to reach raft leader"
+ + " [groupId={},
expirationTime={}, timeoutMs={}, durationMs={}]",
+ storageIndexTrackerError, groupId,
expirationTime,
+
storageIndexUpdateTimeTracker.getTimeoutMs(),
+
storageIndexUpdateTimeTracker.getDurationMs());
+ } else {
+ LOG.info("Successfully waited for storage
index to reach raft leader"
+ + " [groupId={},
expirationTime={}, timeoutMs={}, durationMs={}]",
+ groupId, expirationTime,
+
storageIndexUpdateTimeTracker.getTimeoutMs(),
+
storageIndexUpdateTimeTracker.getDurationMs());
+ }
+ });
+ });
+ }
+
+ /**
+ * Tracks time for timeout operations. Calculates remaining time based on
expiration time and provides utilities
+ * for checking expiration and creating timeout exceptions with detailed
messages.
+ */
+ private static class TimeTracker {
Review Comment:
maybe move it to utils?
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/PlacementDriverMessageProcessor.java:
##########
@@ -259,18 +272,137 @@ private CompletableFuture<LeaseGrantedMessageResponse>
proposeLeaseRedirect(Inte
* @return Future that is completed when local storage catches up the
index that is actual for leader on the moment of request.
*/
private CompletableFuture<Void> waitForActualState(HybridTimestamp
startTime, long expirationTime) {
- LOG.info("Waiting for actual storage state, group=" + groupId);
+ LOG.info("Waiting for actual storage state [groupId={},
expirationTime={}, timeoutMs={}, leaseStartTime={}]",
+ groupId, expirationTime, expirationTime -
currentTimeMillis(), startTime);
replicaReservationClosure.accept(groupId, startTime);
- long timeout = expirationTime - currentTimeMillis();
- if (timeout <= 0) {
- return failedFuture(new TimeoutException());
+ TimeTracker readIndexTimeTracker = new TimeTracker(
+ expirationTime,
+ groupId,
+ "Timeout is expired before raft index reading started");
+ if (readIndexTimeTracker.isExpired()) {
+ return readIndexTimeTracker.getFailedFuture();
+ }
+
+ return retryOperationUntilSuccessOrTimeout(raftClient::readIndex,
readIndexTimeTracker.getTimeoutMs(), executor)
+ .whenComplete((raftIndex, readIndexError) -> {
+ if (readIndexError != null) {
+ LOG.warn("Failed to read index from raft leader"
+ + " [groupId={}, , expirationTime={},
timeoutMs={}, durationMs={}]", readIndexError,
+ groupId, expirationTime,
readIndexTimeTracker.getTimeoutMs(), readIndexTimeTracker.getDurationMs());
+ } else {
+ LOG.info("Successfully read index from raft leader "
+ + "[groupId={}, expirationTime={},
timeoutMs={}, durationMs={}]",
+ groupId, expirationTime,
readIndexTimeTracker.getTimeoutMs(), readIndexTimeTracker.getDurationMs());
+ }
+ })
+ .thenCompose(raftIndex -> {
+ // Recalculate remaining time after readIndex completes.
+ TimeTracker storageIndexUpdateTimeTracker = new
TimeTracker(
+ expirationTime,
+ groupId,
+ "Timeout is expired before storage index tracking
started");
+ if (storageIndexUpdateTimeTracker.isExpired()) {
+ return storageIndexUpdateTimeTracker.getFailedFuture();
+ }
+
+ return storageIndexTracker.waitFor(raftIndex)
+
.orTimeout(storageIndexUpdateTimeTracker.getTimeoutMs(), MILLISECONDS)
+ .whenComplete((v, storageIndexTrackerError) -> {
+ if (storageIndexTrackerError != null) {
+ LOG.warn("Failed to wait for storage index
to reach raft leader"
+ + " [groupId={},
expirationTime={}, timeoutMs={}, durationMs={}]",
+ storageIndexTrackerError, groupId,
expirationTime,
+
storageIndexUpdateTimeTracker.getTimeoutMs(),
+
storageIndexUpdateTimeTracker.getDurationMs());
+ } else {
+ LOG.info("Successfully waited for storage
index to reach raft leader"
+ + " [groupId={},
expirationTime={}, timeoutMs={}, durationMs={}]",
Review Comment:
pls add start time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]