This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b70fa00765 IGNITE-23411 Recover Metastorage SafeTime on Metastorage
recovery (#4544)
b70fa00765 is described below
commit b70fa0076567912eb336bf5801568fb5d628a3cd
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Oct 11 15:18:36 2024 +0400
IGNITE-23411 Recover Metastorage SafeTime on Metastorage recovery (#4544)
---
.../metastorage/impl/MetaStorageManagerImpl.java | 28 +++++++++++++++-------
.../distributed/schema/SchemaSyncServiceImpl.java | 7 ------
2 files changed, 20 insertions(+), 15 deletions(-)
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index bc67d0bdbf..936495685f 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -281,6 +281,12 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
assert targetRevision != null;
listenForRecovery(targetRevision);
+ }).whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.info("Recovery failed", ex);
+
+ recoveryFinishedFuture.completeExceptionally(ex);
+ }
});
return recoveryFinishedFuture;
@@ -304,10 +310,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
storage.setRecoveryRevisionListener(null);
- appliedRevision = targetRevision;
- if (recoveryFinishedFuture.complete(targetRevision)) {
- LOG.info("Finished MetaStorage recovery");
- }
+ finishRecovery(targetRevision);
} finally {
busyLock.leaveBusy();
}
@@ -326,16 +329,25 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
if (storageRevision >= targetRevision) {
storage.setRecoveryRevisionListener(null);
- appliedRevision = targetRevision;
- if (recoveryFinishedFuture.complete(targetRevision)) {
- LOG.info("Finished MetaStorage recovery");
- }
+ finishRecovery(targetRevision);
}
} finally {
busyLock.leaveBusy();
}
}
+ private void finishRecovery(long targetRevision) {
+ appliedRevision = targetRevision;
+
+ if (targetRevision > 0) {
+
clusterTime.updateSafeTime(storage.timestampByRevision(targetRevision));
+ }
+
+ if (recoveryFinishedFuture.complete(targetRevision)) {
+ LOG.info("Finished MetaStorage recovery");
+ }
+ }
+
private CompletableFuture<MetaStorageServiceImpl>
initializeMetaStorage(MetaStorageInfo metaStorageInfo) {
try {
if (thisNodeDidNotWitnessMetaStorageRepair(metaStorageInfo)) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
index 96489ed254..32ef95fad2 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.table.distributed.schema;
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-
import java.util.concurrent.CompletableFuture;
import java.util.function.LongSupplier;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -43,11 +41,6 @@ public class SchemaSyncServiceImpl implements
SchemaSyncService {
@Override
public CompletableFuture<Void> waitForMetadataCompleteness(HybridTimestamp
ts) {
- // There a corner case for tests that are using {@code
WatchListenerInhibitor#metastorageEventsInhibitor}
- // that leads to current time equals {@link HybridTimestamp#MIN_VALUE}
and this method is waiting forever.
- if (HybridTimestamp.MIN_VALUE.equals(clusterTime.currentSafeTime())) {
- return nullCompletedFuture();
- }
return
clusterTime.waitFor(ts.subtractPhysicalTime(delayDurationMs.getAsLong()));
}
}