tkalkirill commented on code in PR #3358:
URL: https://github.com/apache/ignite-3/pull/3358#discussion_r1521189187
##########
modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java:
##########
@@ -2488,7 +2464,7 @@ private static Stream<Arguments>
argumentsForCheckIndexCreationCatalogVersion()
}
@Test
- public void testCatalogCompaction() {
+ public void testCatalogCompaction() throws InterruptedException {
Review Comment:
```suggestion
public void testCatalogCompaction() throws Exception {
```
##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java:
##########
@@ -473,53 +473,12 @@ public CompletableFuture<Void> handle(UpdateLogEvent
event, HybridTimestamp meta
private CompletableFuture<Void> handle(SnapshotEntry event, long
causalityToken) {
Catalog catalog = event.snapshot();
-
- // Use reverse order to find latest descriptors.
- Collection<Catalog> droppedCatalogVersions =
catalogByVer.headMap(catalog.version(), false).descendingMap().values();
-
- List<Fireable> events = new ArrayList<>();
- IntSet objectToSkip = new IntOpenHashSet();
- Predicate<CatalogObjectDescriptor> filter = obj ->
objectToSkip.add(obj.id());
-
- // At first, add alive indexes to filter.
- applyToAliveIndexesFrom(catalog.version(), filter::test);
-
- // Create destroy events for dropped indexes.
- droppedCatalogVersions.forEach(oldCatalog ->
oldCatalog.indexes().stream()
- .filter(filter)
- .forEach(idx -> events.add(
- new DestroyIndexEvent(idx.id(), idx.tableId(),
tableZoneDescriptor(oldCatalog, idx.tableId()).partitions()))
- ));
-
- objectToSkip.clear();
- // At last, create destroy events for dropped tables.
- droppedCatalogVersions.forEach(oldCatalog ->
oldCatalog.tables().stream()
- .filter(tbl -> catalog.table(tbl.id()) == null)
- .filter(filter)
- .forEach(tbl -> events.add(new DestroyTableEvent(tbl.id(),
tableZoneDescriptor(oldCatalog, tbl.id()).partitions()))));
-
// On recovery phase, we must register catalog from the snapshot.
// In other cases, it is ok to rewrite an existed version, because
it's exactly the same.
registerCatalog(catalog);
+ truncateUpTo(catalog);
Review Comment:
`org.apache.ignite.internal.catalog.CatalogManagerImpl.OnUpdateHandlerImpl#handle(org.apache.ignite.internal.catalog.storage.SnapshotEntry,
long)` don't use `causalityToken`
##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java:
##########
@@ -81,6 +86,10 @@ public class ClientPrimaryReplicaTracker implements
EventListener<EventParameter
private final SchemaSyncService schemaSyncService;
+ private final LowWatermark lowWatermark;
+
+ private final DeferredEventsQueue<DestroyTableEvent> deferredEventsQueue =
new DeferredEventsQueue<>(DestroyTableEvent::catalogVersion);
Review Comment:
`DestroyTableEvent::catalogVersion` not` Event timestamp extractor.`
It confuses me a lot.
Another strange name, deferred queue of what? buy milk?
##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java:
##########
@@ -280,8 +294,21 @@ public CompletableFuture<Boolean> notify(EventParameters
parameters) {
}
private CompletableFuture<Boolean> notifyInternal(EventParameters
parameters) {
- if (parameters instanceof DestroyTableEventParameters) {
- removeTable((DestroyTableEventParameters) parameters);
+ if (parameters instanceof DropTableEventParameters) {
+ DropTableEventParameters event = (DropTableEventParameters)
parameters;
+
+ int tableId = event.tableId();
+ int catalogVersion = event.catalogVersion();
+ int previousVersion = catalogVersion - 1;
+
+ // Retrieve descriptor during synchronous call, before the
previous catalog version could be concurrently compacted.
+ CatalogTableDescriptor tableDescriptor =
catalogService.table(tableId, previousVersion);
+ assert tableDescriptor != null : "table";
+
+ CatalogZoneDescriptor zoneDescriptor =
catalogService.zone(tableDescriptor.zoneId(), previousVersion);
+ assert zoneDescriptor != null : "zone";
Review Comment:
```suggestion
assert zoneDescriptor != null : "zoneId=" +
tableDescriptor.zoneId() + ", catalogVersion=" + previousVersion;
```
##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java:
##########
@@ -280,8 +294,21 @@ public CompletableFuture<Boolean> notify(EventParameters
parameters) {
}
private CompletableFuture<Boolean> notifyInternal(EventParameters
parameters) {
- if (parameters instanceof DestroyTableEventParameters) {
- removeTable((DestroyTableEventParameters) parameters);
+ if (parameters instanceof DropTableEventParameters) {
+ DropTableEventParameters event = (DropTableEventParameters)
parameters;
+
+ int tableId = event.tableId();
+ int catalogVersion = event.catalogVersion();
+ int previousVersion = catalogVersion - 1;
+
+ // Retrieve descriptor during synchronous call, before the
previous catalog version could be concurrently compacted.
+ CatalogTableDescriptor tableDescriptor =
catalogService.table(tableId, previousVersion);
+ assert tableDescriptor != null : "table";
Review Comment:
```suggestion
assert tableDescriptor != null : "tableId=" + tableId + ",
catalogVersion=" + previousVersion;
```
##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java:
##########
@@ -113,6 +113,7 @@ private static List<Integer>
collectIndexIdsFromTable(TableImpl table, int parti
return future.join();
}
+ // TODO: validate this.
Review Comment:
Well, well, this check is useful!
Why is TODO here?
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -84,6 +92,12 @@ public class IndexManager implements IgniteComponent {
/** Versioned value to prevent races when registering/unregistering
indexes when processing metastore or catalog events. */
private final IncrementalVersionedValue<Void> handleMetastoreEventVv;
+ /** Low watermark. */
+ private final LowWatermark lowWatermark;
+
+ /** Deferred destruction queue. */
+ private final DeferredEventsQueue<DestroyIndexEvent> deferredQueue = new
DeferredEventsQueue<>(DestroyIndexEvent::catalogVersion);
Review Comment:
Again `DestroyIndexEvent::catalogVersion` not timestamp sooo confused me.
Another strange name, deferred queue of what? buy milk?
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -198,6 +196,58 @@ private CompletableFuture<Boolean>
onIndexCreate(CreateIndexEventParameters para
});
}
+ private CompletableFuture<Boolean>
onIndexRemoved(RemoveIndexEventParameters parameters) {
+ return inBusyLockAsync(busyLock, () -> {
+ int indexId = parameters.indexId();
+ int version = parameters.catalogVersion();
+ int prevVersion = version - 1;
Review Comment:
Please rename to `previousCatalogVersion`
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexChooser.java:
##########
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.index;
-
-import static java.util.Collections.binarySearch;
-import static java.util.Collections.unmodifiableList;
-import static java.util.Comparator.comparingInt;
-import static
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
-import static
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING;
-import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
-
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.ignite.internal.catalog.CatalogService;
-import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
-import org.apache.ignite.internal.catalog.events.CatalogEvent;
-import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
-import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
-import org.apache.ignite.internal.close.ManuallyCloseable;
-import org.apache.ignite.internal.event.EventListener;
-import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-
-/** Index chooser for various operations, for example for RW transactions. */
-class IndexChooser implements ManuallyCloseable {
Review Comment:
Like a sickle to the balls =(
##########
modules/table/src/main/java/org/apache/ignite/internal/table/DeferredEventsQueue.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.ToLongFunction;
+
+/**
+ * A queue for deferred events, which provide method to drain events up to
given watermark. An implementation is a thread-safe wrapper over
+ * {@link java.util.PriorityQueue}.
+ *
+ * @param <T> Event type.
+ */
+public class DeferredEventsQueue<T> {
Review Comment:
It is certainly not obvious from the name that this is for a low watermark.
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -248,4 +298,44 @@ private static <T> BiFunction<T, Throwable,
CompletableFuture<T>> updater(Functi
return updateFunction.apply(t);
};
}
+
+ /** Recover deferred destroy events. */
+ private void recoverDeferredQueue() {
+ int earliestVersion =
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(lowWatermark.getLowWatermark()));
+ int latestVersion = catalogService.latestCatalogVersion();
+
+ synchronized ((deferredQueue)) {
+ for (int version = latestVersion - 1; version >= earliestVersion;
version--) {
Review Comment:
Please use catalogVersion instead version for variables.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ *
+ * <p>Low watermark is the node's local time, which ensures that read-only
transactions have completed by this time, and new read-only
+ * transactions will only be created after this time, and we can safely delete
obsolete/garbage data such as: obsolete versions of table
+ * rows, remote indexes, remote tables, etc.
+ *
+ * @see <a
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol">IEP-91</a>
+ */
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+ private static final IgniteLogger LOG =
Loggers.forClass(LowWatermarkImpl.class);
+
+ static final ByteArray LOW_WATERMARK_VAULT_KEY = new
ByteArray("low-watermark");
+
+ private final LowWatermarkConfiguration lowWatermarkConfig;
+
+ private final HybridClock clock;
+
+ private final TxManager txManager;
+
+ private final VaultManager vaultManager;
+
+ private final List<LowWatermarkChangedListener> updateListeners = new
CopyOnWriteArrayList<>();
+
+ private final ScheduledExecutorService scheduledThreadPool;
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ private volatile @Nullable HybridTimestamp lowWatermark;
+
+ private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture
= new AtomicReference<>();
+
+ private final FailureProcessor failureProcessor;
+
+ /**
+ * Constructor.
+ *
+ * @param nodeName Node name.
+ * @param lowWatermarkConfig Low watermark configuration.
+ * @param clock A hybrid logical clock.
+ * @param txManager Transaction manager.
+ * @param vaultManager Vault manager.
+ * @param failureProcessor Failure processor tha is used to handle
critical errors.
+ */
+ public LowWatermarkImpl(
+ String nodeName,
+ LowWatermarkConfiguration lowWatermarkConfig,
+ HybridClock clock,
+ TxManager txManager,
+ VaultManager vaultManager,
+ FailureProcessor failureProcessor
+ ) {
+ this.lowWatermarkConfig = lowWatermarkConfig;
+ this.clock = clock;
+ this.txManager = txManager;
+ this.vaultManager = vaultManager;
+ this.failureProcessor = failureProcessor;
+
+ scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+ NamedThreadFactory.create(nodeName, "low-watermark-updater",
LOG)
+ );
+ }
+
+ /**
+ * Starts the watermark manager.
+ */
Review Comment:
```suggestion
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ *
+ * <p>Low watermark is the node's local time, which ensures that read-only
transactions have completed by this time, and new read-only
+ * transactions will only be created after this time, and we can safely delete
obsolete/garbage data such as: obsolete versions of table
+ * rows, remote indexes, remote tables, etc.
+ *
+ * @see <a
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol">IEP-91</a>
+ */
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+ private static final IgniteLogger LOG =
Loggers.forClass(LowWatermarkImpl.class);
+
+ static final ByteArray LOW_WATERMARK_VAULT_KEY = new
ByteArray("low-watermark");
+
+ private final LowWatermarkConfiguration lowWatermarkConfig;
+
+ private final HybridClock clock;
+
+ private final TxManager txManager;
+
+ private final VaultManager vaultManager;
+
+ private final List<LowWatermarkChangedListener> updateListeners = new
CopyOnWriteArrayList<>();
+
+ private final ScheduledExecutorService scheduledThreadPool;
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ private volatile @Nullable HybridTimestamp lowWatermark;
+
+ private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture
= new AtomicReference<>();
+
+ private final FailureProcessor failureProcessor;
+
+ /**
+ * Constructor.
+ *
+ * @param nodeName Node name.
+ * @param lowWatermarkConfig Low watermark configuration.
+ * @param clock A hybrid logical clock.
+ * @param txManager Transaction manager.
+ * @param vaultManager Vault manager.
+ * @param failureProcessor Failure processor tha is used to handle
critical errors.
+ */
+ public LowWatermarkImpl(
+ String nodeName,
+ LowWatermarkConfiguration lowWatermarkConfig,
+ HybridClock clock,
+ TxManager txManager,
+ VaultManager vaultManager,
+ FailureProcessor failureProcessor
+ ) {
+ this.lowWatermarkConfig = lowWatermarkConfig;
+ this.clock = clock;
+ this.txManager = txManager;
+ this.vaultManager = vaultManager;
+ this.failureProcessor = failureProcessor;
+
+ scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+ NamedThreadFactory.create(nodeName, "low-watermark-updater",
LOG)
+ );
+ }
+
+ /**
+ * Starts the watermark manager.
+ */
+ @Override
+ public CompletableFuture<Void> start() {
+ inBusyLock(busyLock, () -> {
+ lowWatermark = readLowWatermarkFromVault();
+ });
+
+ return nullCompletedFuture();
+ }
+
+ /**
+ * Schedule watermark updates.
+ */
+ public void scheduleUpdates() {
+ inBusyLock(busyLock, () -> {
+ HybridTimestamp lowWatermarkCandidate = lowWatermark;
+
+ if (lowWatermarkCandidate == null) {
+ LOG.info("Previous value of the low watermark was not found,
will schedule to update it");
+
+ scheduleUpdateLowWatermarkBusy();
+
+ return;
+ }
+
+ LOG.info("Low watermark has been scheduled to be updated: {}",
lowWatermarkCandidate);
+
+ txManager.updateLowWatermark(lowWatermarkCandidate)
+ .thenComposeAsync(unused -> inBusyLock(busyLock, () ->
notifyListeners(lowWatermarkCandidate)), scheduledThreadPool)
+ .whenComplete((unused, throwable) -> {
+ if (throwable == null) {
+ inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
+ } else if (!(throwable instanceof
NodeStoppingException)) {
+ LOG.error("Error during the Watermark manager
start", throwable);
+
+ failureProcessor.process(new
FailureContext(CRITICAL_ERROR, throwable));
+
+ inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
+ }
+ });
+ });
+ }
+
+ private @Nullable HybridTimestamp readLowWatermarkFromVault() {
+ VaultEntry vaultEntry = vaultManager.get(LOW_WATERMARK_VAULT_KEY);
+
+ return vaultEntry == null ? null :
ByteUtils.fromBytes(vaultEntry.value());
+ }
+
+ @Override
+ public void stop() {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ ScheduledFuture<?> lastScheduledTaskFuture =
this.lastScheduledTaskFuture.get();
+
+ if (lastScheduledTaskFuture != null) {
+ lastScheduledTaskFuture.cancel(true);
+ }
+
+ IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10,
TimeUnit.SECONDS);
+ }
+
+ @Override
+ public @Nullable HybridTimestamp getLowWatermark() {
+ return lowWatermark;
+ }
+
+ void updateLowWatermark() {
+ inBusyLock(busyLock, () -> {
+ HybridTimestamp lowWatermarkCandidate =
createNewLowWatermarkCandidate();
+
+ // Wait until all the read-only transactions are finished before
the new candidate, since no new RO transactions could be
+ // created, then we can safely promote the candidate as a new low
watermark, store it in vault, and we can safely start cleaning
+ // up the stale/junk data in the tables.
+ txManager.updateLowWatermark(lowWatermarkCandidate)
+ .thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
+ vaultManager.put(LOW_WATERMARK_VAULT_KEY,
ByteUtils.toBytes(lowWatermarkCandidate));
+
+ lowWatermark = lowWatermarkCandidate;
+
+ return notifyListeners(lowWatermarkCandidate);
+ }), scheduledThreadPool)
+ .whenComplete((unused, throwable) -> {
+ if (throwable != null) {
+ if (!(throwable instanceof NodeStoppingException))
{
+ LOG.error("Failed to update low watermark,
will schedule again: {}", throwable, lowWatermarkCandidate);
+
+ inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
+ }
+ } else {
+ LOG.info("Successful low watermark update: {}",
lowWatermarkCandidate);
+
+ scheduleUpdateLowWatermarkBusy();
+ }
+ });
+ });
+ }
+
+ @Override
+ public void addUpdateListener(LowWatermarkChangedListener listener) {
+ updateListeners.add(listener);
+ }
+
+ @Override
+ public void removeUpdateListener(LowWatermarkChangedListener listener) {
+ updateListeners.remove(listener);
+ }
+
+ private CompletableFuture<Void> notifyListeners(HybridTimestamp
lowWatermark) {
+ if (updateListeners.isEmpty()) {
+ return nullCompletedFuture();
+ }
+
+ ArrayList<CompletableFuture<?>> res = new ArrayList<>();
Review Comment:
```suggestion
var res = new ArrayList<CompletableFuture<?>>();
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ *
+ * <p>Low watermark is the node's local time, which ensures that read-only
transactions have completed by this time, and new read-only
+ * transactions will only be created after this time, and we can safely delete
obsolete/garbage data such as: obsolete versions of table
+ * rows, remote indexes, remote tables, etc.
+ *
+ * @see <a
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol">IEP-91</a>
+ */
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+ private static final IgniteLogger LOG =
Loggers.forClass(LowWatermarkImpl.class);
+
+ static final ByteArray LOW_WATERMARK_VAULT_KEY = new
ByteArray("low-watermark");
+
+ private final LowWatermarkConfiguration lowWatermarkConfig;
+
+ private final HybridClock clock;
+
+ private final TxManager txManager;
+
+ private final VaultManager vaultManager;
+
+ private final List<LowWatermarkChangedListener> updateListeners = new
CopyOnWriteArrayList<>();
+
+ private final ScheduledExecutorService scheduledThreadPool;
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ private volatile @Nullable HybridTimestamp lowWatermark;
+
+ private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture
= new AtomicReference<>();
+
+ private final FailureProcessor failureProcessor;
+
+ /**
+ * Constructor.
+ *
+ * @param nodeName Node name.
+ * @param lowWatermarkConfig Low watermark configuration.
+ * @param clock A hybrid logical clock.
+ * @param txManager Transaction manager.
+ * @param vaultManager Vault manager.
+ * @param failureProcessor Failure processor tha is used to handle
critical errors.
+ */
+ public LowWatermarkImpl(
+ String nodeName,
+ LowWatermarkConfiguration lowWatermarkConfig,
+ HybridClock clock,
+ TxManager txManager,
+ VaultManager vaultManager,
+ FailureProcessor failureProcessor
+ ) {
+ this.lowWatermarkConfig = lowWatermarkConfig;
+ this.clock = clock;
+ this.txManager = txManager;
+ this.vaultManager = vaultManager;
+ this.failureProcessor = failureProcessor;
+
+ scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+ NamedThreadFactory.create(nodeName, "low-watermark-updater",
LOG)
+ );
+ }
+
+ /**
+ * Starts the watermark manager.
+ */
+ @Override
+ public CompletableFuture<Void> start() {
+ inBusyLock(busyLock, () -> {
+ lowWatermark = readLowWatermarkFromVault();
+ });
+
+ return nullCompletedFuture();
+ }
+
+ /**
+ * Schedule watermark updates.
+ */
+ public void scheduleUpdates() {
+ inBusyLock(busyLock, () -> {
+ HybridTimestamp lowWatermarkCandidate = lowWatermark;
+
+ if (lowWatermarkCandidate == null) {
+ LOG.info("Previous value of the low watermark was not found,
will schedule to update it");
+
+ scheduleUpdateLowWatermarkBusy();
+
+ return;
+ }
+
+ LOG.info("Low watermark has been scheduled to be updated: {}",
lowWatermarkCandidate);
+
+ txManager.updateLowWatermark(lowWatermarkCandidate)
+ .thenComposeAsync(unused -> inBusyLock(busyLock, () ->
notifyListeners(lowWatermarkCandidate)), scheduledThreadPool)
+ .whenComplete((unused, throwable) -> {
+ if (throwable == null) {
+ inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
Review Comment:
Why don't we save the new low watermark in vault?
##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TestLowWatermark.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.table.distributed.LowWatermark;
+import
org.apache.ignite.internal.table.distributed.LowWatermarkChangedListener;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Low watermark dummy implementation, which requires explicit {@link
#updateAndNotify(HybridTimestamp)} method call to notify listeners.
+ * This implementation has no persistent state and notifies listeners
instantly in same thread.
+ */
+public class TestLowWatermark implements LowWatermark {
+ private final List<LowWatermarkChangedListener> listeners = new
CopyOnWriteArrayList<>();
+ private volatile HybridTimestamp ts;
+
+ @Override
+ public @Nullable HybridTimestamp getLowWatermark() {
+ return ts;
+ }
+
+ @Override
+ public void addUpdateListener(LowWatermarkChangedListener listener) {
+ this.listeners.add(listener);
+ }
+
+ @Override
+ public void removeUpdateListener(LowWatermarkChangedListener listener) {
+ this.listeners.remove(listener);
+ }
+
+ /**
+ * Update low watermark and notify listeners.
+ *
+ * @param newTs New timestamp.
+ * @return Listener notification future.
+ */
+ public CompletableFuture<Void> updateAndNotify(HybridTimestamp newTs) {
+ assert ts == null || ts.longValue() < newTs.longValue();
Review Comment:
I am expect that newTs not null
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ *
+ * <p>Low watermark is the node's local time, which ensures that read-only
transactions have completed by this time, and new read-only
+ * transactions will only be created after this time, and we can safely delete
obsolete/garbage data such as: obsolete versions of table
+ * rows, remote indexes, remote tables, etc.
+ *
+ * @see <a
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol">IEP-91</a>
+ */
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+ private static final IgniteLogger LOG =
Loggers.forClass(LowWatermarkImpl.class);
+
+ static final ByteArray LOW_WATERMARK_VAULT_KEY = new
ByteArray("low-watermark");
+
+ private final LowWatermarkConfiguration lowWatermarkConfig;
+
+ private final HybridClock clock;
+
+ private final TxManager txManager;
+
+ private final VaultManager vaultManager;
+
+ private final List<LowWatermarkChangedListener> updateListeners = new
CopyOnWriteArrayList<>();
+
+ private final ScheduledExecutorService scheduledThreadPool;
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ private volatile @Nullable HybridTimestamp lowWatermark;
+
+ private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture
= new AtomicReference<>();
+
+ private final FailureProcessor failureProcessor;
+
+ /**
+ * Constructor.
+ *
+ * @param nodeName Node name.
+ * @param lowWatermarkConfig Low watermark configuration.
+ * @param clock A hybrid logical clock.
+ * @param txManager Transaction manager.
+ * @param vaultManager Vault manager.
+ * @param failureProcessor Failure processor tha is used to handle
critical errors.
+ */
+ public LowWatermarkImpl(
+ String nodeName,
+ LowWatermarkConfiguration lowWatermarkConfig,
+ HybridClock clock,
+ TxManager txManager,
+ VaultManager vaultManager,
+ FailureProcessor failureProcessor
+ ) {
+ this.lowWatermarkConfig = lowWatermarkConfig;
+ this.clock = clock;
+ this.txManager = txManager;
+ this.vaultManager = vaultManager;
+ this.failureProcessor = failureProcessor;
+
+ scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+ NamedThreadFactory.create(nodeName, "low-watermark-updater",
LOG)
+ );
+ }
+
+ /**
+ * Starts the watermark manager.
+ */
+ @Override
+ public CompletableFuture<Void> start() {
+ inBusyLock(busyLock, () -> {
+ lowWatermark = readLowWatermarkFromVault();
+ });
+
+ return nullCompletedFuture();
+ }
+
+ /**
+ * Schedule watermark updates.
+ */
+ public void scheduleUpdates() {
+ inBusyLock(busyLock, () -> {
+ HybridTimestamp lowWatermarkCandidate = lowWatermark;
+
+ if (lowWatermarkCandidate == null) {
+ LOG.info("Previous value of the low watermark was not found,
will schedule to update it");
+
+ scheduleUpdateLowWatermarkBusy();
+
+ return;
+ }
+
+ LOG.info("Low watermark has been scheduled to be updated: {}",
lowWatermarkCandidate);
+
+ txManager.updateLowWatermark(lowWatermarkCandidate)
+ .thenComposeAsync(unused -> inBusyLock(busyLock, () ->
notifyListeners(lowWatermarkCandidate)), scheduledThreadPool)
+ .whenComplete((unused, throwable) -> {
+ if (throwable == null) {
+ inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
+ } else if (!(throwable instanceof
NodeStoppingException)) {
+ LOG.error("Error during the Watermark manager
start", throwable);
+
+ failureProcessor.process(new
FailureContext(CRITICAL_ERROR, throwable));
+
+ inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
+ }
+ });
+ });
+ }
+
+ private @Nullable HybridTimestamp readLowWatermarkFromVault() {
+ VaultEntry vaultEntry = vaultManager.get(LOW_WATERMARK_VAULT_KEY);
+
+ return vaultEntry == null ? null :
ByteUtils.fromBytes(vaultEntry.value());
+ }
+
+ @Override
+ public void stop() {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ ScheduledFuture<?> lastScheduledTaskFuture =
this.lastScheduledTaskFuture.get();
+
+ if (lastScheduledTaskFuture != null) {
+ lastScheduledTaskFuture.cancel(true);
+ }
+
+ IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10,
TimeUnit.SECONDS);
+ }
+
+ @Override
+ public @Nullable HybridTimestamp getLowWatermark() {
+ return lowWatermark;
+ }
+
+ void updateLowWatermark() {
+ inBusyLock(busyLock, () -> {
+ HybridTimestamp lowWatermarkCandidate =
createNewLowWatermarkCandidate();
+
+ // Wait until all the read-only transactions are finished before
the new candidate, since no new RO transactions could be
+ // created, then we can safely promote the candidate as a new low
watermark, store it in vault, and we can safely start cleaning
+ // up the stale/junk data in the tables.
+ txManager.updateLowWatermark(lowWatermarkCandidate)
+ .thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
+ vaultManager.put(LOW_WATERMARK_VAULT_KEY,
ByteUtils.toBytes(lowWatermarkCandidate));
Review Comment:
I think it's better to save the new low watermark only after listeners
complete their futures.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -725,11 +726,29 @@ private CompletableFuture<List<Assignments>>
writeTableAssignmentsToMetastore(
});
}
- private CompletableFuture<Void> onTableDestroy(DestroyTableEventParameters
parameters) {
+ private CompletableFuture<Boolean> onTableDrop(DropTableEventParameters
parameters) {
return inBusyLockAsync(busyLock, () -> {
- dropTableLocally(parameters.causalityToken(), parameters);
+ deferredQueue.enqueue(new
DestroyTableEvent(parameters.catalogVersion(), parameters.tableId()));
Review Comment:
Again u can use `org.apache.ignite.internal.event.EventListener#fromConsumer`
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1391,63 +1410,43 @@ protected TxStateTableStorage
createTxStateTableStorage(CatalogTableDescriptor t
/**
* Drops local structures for a table.
*
- * @param causalityToken Causality token.
- * @param parameters Destroy table event parameters.
+ * @param tableId Table id to destroy.
*/
- private void dropTableLocally(long causalityToken,
DestroyTableEventParameters parameters) {
- int tableId = parameters.tableId();
- // TODO Drop partitions from parameters and use from storage.
- int partitions = parameters.partitions();
-
- localPartitionsVv.update(causalityToken, (previousVal, e) ->
inBusyLock(busyLock, () -> {
- if (e != null) {
- return failedFuture(e);
- }
+ private CompletableFuture<Void> destroyTableLocally(int tableId) {
+ TableImpl table = startedTables.remove(tableId);
+ localPartsByTableId.remove(tableId);
- localPartsByTableId.remove(tableId);
+ assert table != null;
Review Comment:
```suggestion
assert table != null : tableId;
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1314,14 +1334,13 @@ private CompletableFuture<Void> createTableLocally(
return failedFuture(e);
}
- startedTables.put(tableId, table);
-
return allOf(localPartsUpdateFuture,
tablesByIdFuture).thenComposeAsync(ignore -> inBusyLock(busyLock, () -> {
if (onNodeRecovery) {
SchemaRegistry schemaRegistry = table.schemaView();
PartitionSet partitionSet =
localPartsByTableId.get(tableId);
+ HybridTimestamp lwm =
lowWatermark.getLowWatermark();
Review Comment:
Please add a comment that the low watermark will start updating only after
the node is restored and there will be no races.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1836,11 +1835,12 @@ private CompletableFuture<Void>
handleChangePendingAssignmentEvent(
)
.thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
if (!isRecovery) {
+ HybridTimestamp lwm =
lowWatermark.getLowWatermark();
// We create index storages (and also register the
necessary structures) for the rebalancing one partition
// before start the raft node, so that the updates
that come when applying the replication log can safely
// update the indexes. On recovery node, we do not
need to call this code, since during restoration we start
// all partitions and already register indexes
there.
- registerIndexesToTable(tbl, catalogService,
singlePartitionIdSet, tbl.schemaView());
+ registerIndexesToTable(tbl, catalogService,
singlePartitionIdSet, tbl.schemaView(), lwm);
Review Comment:
Here it looks like there may be a race, for example, a low watermark will be
aggressively updated quite quickly and it may turn out that the task of
destroying the index will end before we register the index and it will be
created empty.
##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java:
##########
@@ -304,9 +331,22 @@ private CompletableFuture<Boolean>
notifyInternal(EventParameters parameters) {
return falseCompletedFuture(); // false: don't remove listener.
}
- private void removeTable(DestroyTableEventParameters event) {
- for (int partition = 0; partition < event.partitions(); partition++) {
- TablePartitionId tablePartitionId = new
TablePartitionId(event.tableId(), partition);
+ @Override
+ public CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) {
+ return inBusyLockAsync(busyLock, () -> {
+ int earliestVersion =
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(ts));
+
+ List<DestroyTableEvent> events =
deferredEventsQueue.drainUpTo(earliestVersion);
+
+ events.forEach(event -> removeTable(event.tableId(),
event.partitions()));
+
+ return nullCompletedFuture();
+ });
+ }
+
+ private void removeTable(int tableId, int partitions) {
+ for (int partition = 0; partition < partitions; partition++) {
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partition);
primaryReplicas.remove(tablePartitionId);
Review Comment:
Do I understand correctly that there can be races between updating the
watermark and updating the lease holder?
##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java:
##########
@@ -100,6 +100,10 @@ public interface CatalogService extends
EventProducer<CatalogEvent, CatalogEvent
/** Returns the earliest registered version of the catalog. */
int earliestCatalogVersion();
+ /** Returns the earliest registered version of the catalog, which is
observable since given timestamp. */
+ // TODO IGNITE-21608 Use method without timestamp instead?
+ int earliestCatalogVersion(long timestamp);
Review Comment:
I think this method is not needed yet and adds additional complexity when
using the catalog, since the catalog will not be compacted yet and we will
figure out the details of its implementation later, we can safely use
`CatalogService#activeCatalogVersion`.
##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java:
##########
@@ -280,8 +294,21 @@ public CompletableFuture<Boolean> notify(EventParameters
parameters) {
}
private CompletableFuture<Boolean> notifyInternal(EventParameters
parameters) {
- if (parameters instanceof DestroyTableEventParameters) {
- removeTable((DestroyTableEventParameters) parameters);
+ if (parameters instanceof DropTableEventParameters) {
Review Comment:
U can use `org.apache.ignite.internal.event.EventListener#fromConsumer`
##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java:
##########
@@ -68,7 +73,7 @@
* load it from the placement driver. Wait for a limited amount of time
(in getPrimaryReplica) and return what we have.
* Don't block the client for too long, it is better to miss the primary
than to delay the request.
*/
-public class ClientPrimaryReplicaTracker implements
EventListener<EventParameters> {
+public class ClientPrimaryReplicaTracker implements
EventListener<EventParameters>, LowWatermarkChangedListener {
Review Comment:
For me, we don’t need to implement both interfaces since in
`ClientPrimaryReplicaTracker#start` we add ourselves and not from the outside;
personally, this code confused me.
##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java:
##########
@@ -359,4 +399,29 @@ public long timestamp() {
return timestamp;
}
}
+
+ /** Internal event. */
+ private static class DestroyTableEvent {
Review Comment:
Why not create a package private class separately?
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -198,6 +196,58 @@ private CompletableFuture<Boolean>
onIndexCreate(CreateIndexEventParameters para
});
}
+ private CompletableFuture<Boolean>
onIndexRemoved(RemoveIndexEventParameters parameters) {
+ return inBusyLockAsync(busyLock, () -> {
+ int indexId = parameters.indexId();
+ int version = parameters.catalogVersion();
Review Comment:
Please rename to `catalogVersion`
##########
modules/table/src/main/java/org/apache/ignite/internal/table/DeferredEventsQueue.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.ToLongFunction;
+
+/**
+ * A queue for deferred events, which provide method to drain events up to
given watermark. An implementation is a thread-safe wrapper over
+ * {@link java.util.PriorityQueue}.
+ *
+ * @param <T> Event type.
+ */
+public class DeferredEventsQueue<T> {
+ private final PriorityQueue<T> queue;
+ private final ToLongFunction<T> mapper;
+
+ /**
+ * Creates a queue.
+ *
+ * @param mapper Event timestamp extractor.
+ */
+ public DeferredEventsQueue(ToLongFunction<T> mapper) {
Review Comment:
Here you don’t always get a timestamp, but also the catalog version is very
confusing, I would suggest passing a comparator for the queue.
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -198,6 +196,58 @@ private CompletableFuture<Boolean>
onIndexCreate(CreateIndexEventParameters para
});
}
+ private CompletableFuture<Boolean>
onIndexRemoved(RemoveIndexEventParameters parameters) {
+ return inBusyLockAsync(busyLock, () -> {
+ int indexId = parameters.indexId();
+ int version = parameters.catalogVersion();
+ int prevVersion = version - 1;
+
+ // Retrieve descriptor during synchronous call, before the
previous catalog version could be concurrently compacted.
+ CatalogIndexDescriptor indexDescriptor =
catalogService.index(indexId, prevVersion);
+ assert indexDescriptor != null : "index";
+
+ int tableId = indexDescriptor.tableId();
+
+ if (catalogService.table(tableId, version) == null) {
+ // Nothing to do. Index will be destroyed along with the table.
+ return falseCompletedFuture();
+ }
+
+ deferredQueue.enqueue(new DestroyIndexEvent(version, indexId,
tableId));
+
+ return falseCompletedFuture();
+ });
+ }
+
+ private CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) {
+ return inBusyLockAsync(busyLock, () -> {
+ int earliestVersion =
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(ts));
+
+ List<DestroyIndexEvent> events =
deferredQueue.drainUpTo(earliestVersion);
+
+ if (events.isEmpty()) {
+ return nullCompletedFuture();
+ }
+
+ List<CompletableFuture<Void>> futures = events.stream()
+ .map(event -> destroyIndexAsync(event.indexId(),
event.tableId()))
+ .collect(Collectors.toList());
Review Comment:
Maybe use static import ?
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -198,6 +196,58 @@ private CompletableFuture<Boolean>
onIndexCreate(CreateIndexEventParameters para
});
}
+ private CompletableFuture<Boolean>
onIndexRemoved(RemoveIndexEventParameters parameters) {
+ return inBusyLockAsync(busyLock, () -> {
Review Comment:
U can use `org.apache.ignite.internal.event.EventListener#fromConsumer`
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -248,4 +298,44 @@ private static <T> BiFunction<T, Throwable,
CompletableFuture<T>> updater(Functi
return updateFunction.apply(t);
};
}
+
+ /** Recover deferred destroy events. */
+ private void recoverDeferredQueue() {
+ int earliestVersion =
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(lowWatermark.getLowWatermark()));
+ int latestVersion = catalogService.latestCatalogVersion();
+
+ synchronized ((deferredQueue)) {
Review Comment:
For what `synchronized` ?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/DeferredEventsQueue.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.ToLongFunction;
+
+/**
+ * A queue for deferred events, which provide method to drain events up to
given watermark. An implementation is a thread-safe wrapper over
+ * {@link java.util.PriorityQueue}.
+ *
+ * @param <T> Event type.
+ */
+public class DeferredEventsQueue<T> {
+ private final PriorityQueue<T> queue;
Review Comment:
Add a comment that is guarded by yourself.
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -198,6 +196,58 @@ private CompletableFuture<Boolean>
onIndexCreate(CreateIndexEventParameters para
});
}
+ private CompletableFuture<Boolean>
onIndexRemoved(RemoveIndexEventParameters parameters) {
+ return inBusyLockAsync(busyLock, () -> {
+ int indexId = parameters.indexId();
+ int version = parameters.catalogVersion();
+ int prevVersion = version - 1;
+
+ // Retrieve descriptor during synchronous call, before the
previous catalog version could be concurrently compacted.
+ CatalogIndexDescriptor indexDescriptor =
catalogService.index(indexId, prevVersion);
+ assert indexDescriptor != null : "index";
+
+ int tableId = indexDescriptor.tableId();
+
+ if (catalogService.table(tableId, version) == null) {
+ // Nothing to do. Index will be destroyed along with the table.
+ return falseCompletedFuture();
+ }
+
+ deferredQueue.enqueue(new DestroyIndexEvent(version, indexId,
tableId));
+
+ return falseCompletedFuture();
+ });
+ }
+
+ private CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) {
+ return inBusyLockAsync(busyLock, () -> {
+ int earliestVersion =
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(ts));
Review Comment:
Please rename to `lwmActiveCatalogVersion`
##########
modules/table/src/main/java/org/apache/ignite/internal/table/DeferredEventsQueue.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.ToLongFunction;
+
+/**
+ * A queue for deferred events, which provide method to drain events up to
given watermark. An implementation is a thread-safe wrapper over
+ * {@link java.util.PriorityQueue}.
+ *
+ * @param <T> Event type.
+ */
+public class DeferredEventsQueue<T> {
+ private final PriorityQueue<T> queue;
+ private final ToLongFunction<T> mapper;
+
+ /**
+ * Creates a queue.
+ *
+ * @param mapper Event timestamp extractor.
+ */
+ public DeferredEventsQueue(ToLongFunction<T> mapper) {
+ this.mapper = mapper;
+ this.queue = new
PriorityQueue<>(Comparator.comparingLong(this.mapper));
+ }
+
+ /**
+ * Offers a new event to the queue.
+ *
+ * @param event New deferred event.
+ */
+ public boolean enqueue(T event) {
+ synchronized (queue) {
+ return queue.offer(event);
+ }
+ }
+
+ /**
+ * Drain queue up to given watermark and return dequeued events.
+ *
+ * @param watermark Timestamp to drain up to.
+ * @return Dequeued events.
+ */
+ public List<T> drainUpTo(long watermark) {
+ synchronized (queue) {
+ if (!hasExpired0(watermark)) {
+ return List.of();
+ }
+
+ List<T> events = new ArrayList<>();
+ do {
+ T event = queue.poll();
+
+ events.add(event);
+ } while (hasExpired0(watermark));
+
+ return events;
+ }
+ }
+
+ /**
+ * Returns queue size.
+ */
+ public int size() {
+ synchronized (queue) {
+ return queue.size();
+ }
+ }
+
+ /**
+ * Returns {@code true} if queue is empty, {@code false} otherwise.
+ */
+ public boolean isEmpty() {
+ synchronized (queue) {
+ return queue.isEmpty();
+ }
+ }
+
+ /**
+ * Returns {@code true} if found events below watermark, {@code false}
otherwise.
+ */
+ public boolean hasExpiredEvents(long watermark) {
Review Comment:
What is a low watermark? timestamp, catalog version or what?
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -248,4 +298,44 @@ private static <T> BiFunction<T, Throwable,
CompletableFuture<T>> updater(Functi
return updateFunction.apply(t);
};
}
+
+ /** Recover deferred destroy events. */
+ private void recoverDeferredQueue() {
+ int earliestVersion =
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(lowWatermark.getLowWatermark()));
+ int latestVersion = catalogService.latestCatalogVersion();
+
+ synchronized ((deferredQueue)) {
+ for (int version = latestVersion - 1; version >= earliestVersion;
version--) {
+ int nextVersion = version + 1;
+ catalogService.indexes(version).stream()
+ .filter(idx -> catalogService.index(idx.id(),
nextVersion) == null)
+ .forEach(idx -> deferredQueue.enqueue(new
DestroyIndexEvent(nextVersion, idx.id(), idx.tableId())));
+ }
+ }
+ }
+
+ /** Internal event. */
+ private static class DestroyIndexEvent {
Review Comment:
Please use package private class.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ *
+ * <p>Low watermark is the node's local time, which ensures that read-only
transactions have completed by this time, and new read-only
+ * transactions will only be created after this time, and we can safely delete
obsolete/garbage data such as: obsolete versions of table
+ * rows, remote indexes, remote tables, etc.
+ *
+ * @see <a
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol">IEP-91</a>
+ */
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+ private static final IgniteLogger LOG =
Loggers.forClass(LowWatermarkImpl.class);
+
+ static final ByteArray LOW_WATERMARK_VAULT_KEY = new
ByteArray("low-watermark");
+
+ private final LowWatermarkConfiguration lowWatermarkConfig;
+
+ private final HybridClock clock;
+
+ private final TxManager txManager;
+
+ private final VaultManager vaultManager;
+
+ private final List<LowWatermarkChangedListener> updateListeners = new
CopyOnWriteArrayList<>();
+
+ private final ScheduledExecutorService scheduledThreadPool;
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ private volatile @Nullable HybridTimestamp lowWatermark;
+
+ private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture
= new AtomicReference<>();
+
+ private final FailureProcessor failureProcessor;
+
+ /**
+ * Constructor.
+ *
+ * @param nodeName Node name.
+ * @param lowWatermarkConfig Low watermark configuration.
+ * @param clock A hybrid logical clock.
+ * @param txManager Transaction manager.
+ * @param vaultManager Vault manager.
+ * @param failureProcessor Failure processor tha is used to handle
critical errors.
+ */
+ public LowWatermarkImpl(
+ String nodeName,
+ LowWatermarkConfiguration lowWatermarkConfig,
+ HybridClock clock,
+ TxManager txManager,
+ VaultManager vaultManager,
+ FailureProcessor failureProcessor
+ ) {
+ this.lowWatermarkConfig = lowWatermarkConfig;
+ this.clock = clock;
+ this.txManager = txManager;
+ this.vaultManager = vaultManager;
+ this.failureProcessor = failureProcessor;
+
+ scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+ NamedThreadFactory.create(nodeName, "low-watermark-updater",
LOG)
+ );
+ }
+
+ /**
+ * Starts the watermark manager.
+ */
+ @Override
+ public CompletableFuture<Void> start() {
+ inBusyLock(busyLock, () -> {
+ lowWatermark = readLowWatermarkFromVault();
+ });
+
+ return nullCompletedFuture();
Review Comment:
```suggestion
return inBusyLockAsync(busyLock, () -> {
lowWatermark = readLowWatermarkFromVault();
return nullCompletedFuture();
});
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ *
+ * <p>Low watermark is the node's local time, which ensures that read-only
transactions have completed by this time, and new read-only
+ * transactions will only be created after this time, and we can safely delete
obsolete/garbage data such as: obsolete versions of table
+ * rows, remote indexes, remote tables, etc.
+ *
+ * @see <a
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol">IEP-91</a>
+ */
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+ private static final IgniteLogger LOG =
Loggers.forClass(LowWatermarkImpl.class);
+
+ static final ByteArray LOW_WATERMARK_VAULT_KEY = new
ByteArray("low-watermark");
+
+ private final LowWatermarkConfiguration lowWatermarkConfig;
+
+ private final HybridClock clock;
+
+ private final TxManager txManager;
+
+ private final VaultManager vaultManager;
+
+ private final List<LowWatermarkChangedListener> updateListeners = new
CopyOnWriteArrayList<>();
+
+ private final ScheduledExecutorService scheduledThreadPool;
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ private volatile @Nullable HybridTimestamp lowWatermark;
+
+ private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture
= new AtomicReference<>();
+
+ private final FailureProcessor failureProcessor;
+
+ /**
+ * Constructor.
+ *
+ * @param nodeName Node name.
+ * @param lowWatermarkConfig Low watermark configuration.
+ * @param clock A hybrid logical clock.
+ * @param txManager Transaction manager.
+ * @param vaultManager Vault manager.
+ * @param failureProcessor Failure processor tha is used to handle
critical errors.
+ */
+ public LowWatermarkImpl(
+ String nodeName,
+ LowWatermarkConfiguration lowWatermarkConfig,
+ HybridClock clock,
+ TxManager txManager,
+ VaultManager vaultManager,
+ FailureProcessor failureProcessor
+ ) {
+ this.lowWatermarkConfig = lowWatermarkConfig;
+ this.clock = clock;
+ this.txManager = txManager;
+ this.vaultManager = vaultManager;
+ this.failureProcessor = failureProcessor;
+
+ scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
Review Comment:
It would be necessary to create a task with todo to use a common pool and
not a separate one.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ *
+ * <p>Low watermark is the node's local time, which ensures that read-only
transactions have completed by this time, and new read-only
+ * transactions will only be created after this time, and we can safely delete
obsolete/garbage data such as: obsolete versions of table
+ * rows, remote indexes, remote tables, etc.
+ *
+ * @see <a
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol">IEP-91</a>
+ */
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+ private static final IgniteLogger LOG =
Loggers.forClass(LowWatermarkImpl.class);
+
+ static final ByteArray LOW_WATERMARK_VAULT_KEY = new
ByteArray("low-watermark");
+
+ private final LowWatermarkConfiguration lowWatermarkConfig;
+
+ private final HybridClock clock;
+
+ private final TxManager txManager;
+
+ private final VaultManager vaultManager;
+
+ private final List<LowWatermarkChangedListener> updateListeners = new
CopyOnWriteArrayList<>();
+
+ private final ScheduledExecutorService scheduledThreadPool;
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ private volatile @Nullable HybridTimestamp lowWatermark;
+
+ private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture
= new AtomicReference<>();
+
+ private final FailureProcessor failureProcessor;
+
+ /**
+ * Constructor.
+ *
+ * @param nodeName Node name.
+ * @param lowWatermarkConfig Low watermark configuration.
+ * @param clock A hybrid logical clock.
+ * @param txManager Transaction manager.
+ * @param vaultManager Vault manager.
+ * @param failureProcessor Failure processor tha is used to handle
critical errors.
+ */
+ public LowWatermarkImpl(
+ String nodeName,
+ LowWatermarkConfiguration lowWatermarkConfig,
+ HybridClock clock,
+ TxManager txManager,
+ VaultManager vaultManager,
+ FailureProcessor failureProcessor
+ ) {
+ this.lowWatermarkConfig = lowWatermarkConfig;
+ this.clock = clock;
+ this.txManager = txManager;
+ this.vaultManager = vaultManager;
+ this.failureProcessor = failureProcessor;
+
+ scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+ NamedThreadFactory.create(nodeName, "low-watermark-updater",
LOG)
+ );
+ }
+
+ /**
+ * Starts the watermark manager.
+ */
+ @Override
+ public CompletableFuture<Void> start() {
+ inBusyLock(busyLock, () -> {
+ lowWatermark = readLowWatermarkFromVault();
+ });
+
+ return nullCompletedFuture();
+ }
+
+ /**
+ * Schedule watermark updates.
+ */
+ public void scheduleUpdates() {
+ inBusyLock(busyLock, () -> {
+ HybridTimestamp lowWatermarkCandidate = lowWatermark;
+
+ if (lowWatermarkCandidate == null) {
+ LOG.info("Previous value of the low watermark was not found,
will schedule to update it");
+
+ scheduleUpdateLowWatermarkBusy();
+
+ return;
+ }
+
+ LOG.info("Low watermark has been scheduled to be updated: {}",
lowWatermarkCandidate);
+
+ txManager.updateLowWatermark(lowWatermarkCandidate)
Review Comment:
Could we avoid duplicating code?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkImpl.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ *
+ * <p>Low watermark is the node's local time, which ensures that read-only
transactions have completed by this time, and new read-only
+ * transactions will only be created after this time, and we can safely delete
obsolete/garbage data such as: obsolete versions of table
+ * rows, remote indexes, remote tables, etc.
+ *
+ * @see <a
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol">IEP-91</a>
+ */
+public class LowWatermarkImpl implements IgniteComponent, LowWatermark {
+ private static final IgniteLogger LOG =
Loggers.forClass(LowWatermarkImpl.class);
+
+ static final ByteArray LOW_WATERMARK_VAULT_KEY = new
ByteArray("low-watermark");
+
+ private final LowWatermarkConfiguration lowWatermarkConfig;
+
+ private final HybridClock clock;
+
+ private final TxManager txManager;
+
+ private final VaultManager vaultManager;
+
+ private final List<LowWatermarkChangedListener> updateListeners = new
CopyOnWriteArrayList<>();
+
+ private final ScheduledExecutorService scheduledThreadPool;
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ private volatile @Nullable HybridTimestamp lowWatermark;
+
+ private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture
= new AtomicReference<>();
+
+ private final FailureProcessor failureProcessor;
+
+ /**
+ * Constructor.
+ *
+ * @param nodeName Node name.
+ * @param lowWatermarkConfig Low watermark configuration.
+ * @param clock A hybrid logical clock.
+ * @param txManager Transaction manager.
+ * @param vaultManager Vault manager.
+ * @param failureProcessor Failure processor tha is used to handle
critical errors.
+ */
+ public LowWatermarkImpl(
+ String nodeName,
+ LowWatermarkConfiguration lowWatermarkConfig,
+ HybridClock clock,
+ TxManager txManager,
+ VaultManager vaultManager,
+ FailureProcessor failureProcessor
+ ) {
+ this.lowWatermarkConfig = lowWatermarkConfig;
+ this.clock = clock;
+ this.txManager = txManager;
+ this.vaultManager = vaultManager;
+ this.failureProcessor = failureProcessor;
+
+ scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+ NamedThreadFactory.create(nodeName, "low-watermark-updater",
LOG)
+ );
+ }
+
+ /**
+ * Starts the watermark manager.
+ */
+ @Override
+ public CompletableFuture<Void> start() {
+ inBusyLock(busyLock, () -> {
+ lowWatermark = readLowWatermarkFromVault();
+ });
+
+ return nullCompletedFuture();
+ }
+
+ /**
+ * Schedule watermark updates.
Review Comment:
```suggestion
* Schedule low watermark updates.
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/DeferredEventsQueue.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.ToLongFunction;
+
+/**
+ * A queue for deferred events, which provide method to drain events up to
given watermark. An implementation is a thread-safe wrapper over
+ * {@link java.util.PriorityQueue}.
+ *
+ * @param <T> Event type.
+ */
+public class DeferredEventsQueue<T> {
+ private final PriorityQueue<T> queue;
+ private final ToLongFunction<T> mapper;
+
+ /**
+ * Creates a queue.
+ *
+ * @param mapper Event timestamp extractor.
+ */
+ public DeferredEventsQueue(ToLongFunction<T> mapper) {
+ this.mapper = mapper;
+ this.queue = new
PriorityQueue<>(Comparator.comparingLong(this.mapper));
+ }
+
+ /**
+ * Offers a new event to the queue.
+ *
+ * @param event New deferred event.
+ */
+ public boolean enqueue(T event) {
+ synchronized (queue) {
+ return queue.offer(event);
+ }
+ }
+
+ /**
+ * Drain queue up to given watermark and return dequeued events.
+ *
+ * @param watermark Timestamp to drain up to.
+ * @return Dequeued events.
+ */
+ public List<T> drainUpTo(long watermark) {
+ synchronized (queue) {
+ if (!hasExpired0(watermark)) {
+ return List.of();
+ }
+
+ List<T> events = new ArrayList<>();
+ do {
+ T event = queue.poll();
+
+ events.add(event);
+ } while (hasExpired0(watermark));
+
+ return events;
+ }
+ }
+
+ /**
+ * Returns queue size.
+ */
+ public int size() {
+ synchronized (queue) {
+ return queue.size();
+ }
+ }
+
+ /**
+ * Returns {@code true} if queue is empty, {@code false} otherwise.
+ */
+ public boolean isEmpty() {
+ synchronized (queue) {
+ return queue.isEmpty();
+ }
+ }
+
+ /**
+ * Returns {@code true} if found events below watermark, {@code false}
otherwise.
+ */
+ public boolean hasExpiredEvents(long watermark) {
+ synchronized (queue) {
+ return hasExpired0(watermark);
+ }
+ }
+
+ /**
+ * Removes all events from the queue.
+ */
+ public void clear() {
+ synchronized (queue) {
+ queue.clear();
+ }
+ }
+
+ private boolean hasExpired0(long watermark) {
Review Comment:
What is a low watermark? timestamp, catalog version or what?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -292,6 +293,9 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
/** Started tables. */
private final Map<Integer, TableImpl> startedTables = new
ConcurrentHashMap<>();
+ /** Deferred destruction queue. */
+ private final DeferredEventsQueue<DestroyTableEvent> deferredQueue = new
DeferredEventsQueue<>(DestroyTableEvent::catalogVersion);
Review Comment:
Again 2 pain (in my not ass).
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static
org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
+import static org.apache.ignite.internal.table.TableTestUtils.createHashIndex;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
+import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
+import static
org.apache.ignite.internal.util.CompletableFutures.emptySetCompletedFuture;
+import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
+import static org.apache.ignite.sql.ColumnType.INT64;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.LongFunction;
+import org.apache.ignite.internal.affinity.AffinityUtils;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import
org.apache.ignite.internal.metastorage.server.TestRocksDbKeyValueStorage;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
+import org.apache.ignite.internal.replicator.ReplicaManager;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaUtils;
+import org.apache.ignite.internal.schema.configuration.GcConfiguration;
+import
org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
+import org.apache.ignite.internal.storage.DataStorageManager;
+import org.apache.ignite.internal.storage.DataStorageModules;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryDataStorageModule;
+import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.TestLowWatermark;
+import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+import org.apache.ignite.internal.tx.HybridTimestampTracker;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.sql.IgniteSql;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+/**
+ * Table manager recovery scenarios.
+ */
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class,
WorkDirectoryExtension.class})
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class TableManagerRecoveryTest extends IgniteAbstractTest {
Review Comment:
The test is difficult to read, I trust the author.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUtils.java:
##########
@@ -84,32 +87,34 @@ public static void registerIndexToTable(
* @param catalogService Catalog service.
* @param partitionSet Partitions for which index storages will need to be
created if they are missing.
* @param schemaRegistry Table schema register.
+ * @param lwm Low watermark.
*/
public static void registerIndexesToTable(
TableViewInternal table,
CatalogService catalogService,
PartitionSet partitionSet,
- SchemaRegistry schemaRegistry
+ SchemaRegistry schemaRegistry,
+ @Nullable HybridTimestamp lwm
Review Comment:
Please describe in the javadoc why this argument is used and how it will be
used.
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java:
##########
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
Review Comment:
3 licenses are power.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2468,4 +2468,23 @@ private <T> CompletableFuture<T>
orStopManagerFuture(CompletableFuture<T> future
return anyOf(future, stopManagerFuture).thenApply(o -> (T) o);
}
+
+ /** Internal event. */
+ private static class DestroyTableEvent {
Review Comment:
Please use package private class.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -725,11 +726,29 @@ private CompletableFuture<List<Assignments>>
writeTableAssignmentsToMetastore(
});
}
- private CompletableFuture<Void> onTableDestroy(DestroyTableEventParameters
parameters) {
+ private CompletableFuture<Boolean> onTableDrop(DropTableEventParameters
parameters) {
return inBusyLockAsync(busyLock, () -> {
- dropTableLocally(parameters.causalityToken(), parameters);
+ deferredQueue.enqueue(new
DestroyTableEvent(parameters.catalogVersion(), parameters.tableId()));
- return nullCompletedFuture();
+ return falseCompletedFuture();
+ });
+ }
+
+ private CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) {
+ return inBusyLockAsync(busyLock, () -> {
+ int earliestVersion =
catalogService.earliestCatalogVersion(HybridTimestamp.hybridTimestampToLong(ts));
Review Comment:
Please use naming `lwmCatalogVersion`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]