This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f4458ab8c IGNITE-19326 Close partition safe time trackers (#1960)
6f4458ab8c is described below

commit 6f4458ab8c99a10ef9669be48e88379bb656bfd4
Author: Kirill Tkalenko <tkalkir...@yandex.ru>
AuthorDate: Thu Apr 20 19:36:01 2023 +0300

    IGNITE-19326 Close partition safe time trackers (#1960)
---
 .../ignite/client/fakes/FakeInternalTable.java     | 11 +++
 .../util/PendingComparableValuesTracker.java       | 95 ++++++++++++++++-----
 .../internal/util/TrackerClosedException.java      | 25 ++++++
 .../util/PendingComparableValuesTrackerTest.java   | 17 ++++
 .../ignite/internal/table/InternalTable.java       | 15 ++++
 .../internal/table/distributed/TableManager.java   | 64 ++++++++++----
 .../table/distributed/raft/PartitionListener.java  | 16 +++-
 .../distributed/storage/InternalTableImpl.java     | 98 +++++++++++++++++-----
 .../distributed/storage/InternalTableImplTest.java | 85 +++++++++++++++++++
 .../table/impl/DummyInternalTableImpl.java         |  4 +-
 10 files changed, 370 insertions(+), 60 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index ba232b6580..393ce89cfb 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -39,6 +39,7 @@ import 
org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
@@ -466,4 +467,14 @@ public class FakeInternalTable implements InternalTable {
             dataAccessListener.accept(operation, arg);
         }
     }
+
+    @Override
+    public @Nullable PendingComparableValuesTracker<HybridTimestamp> 
getPartitionSafeTimeTracker(int partitionId) {
+        return null;
+    }
+
+    @Override
+    public @Nullable PendingComparableValuesTracker<Long> 
getPartitionStorageIndexTracker(int partitionId) {
+        return null;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
index 545e8d0d09..8e4fa4a55e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
@@ -18,23 +18,28 @@
 package org.apache.ignite.internal.util;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.ignite.internal.close.ManuallyCloseable;
 
 /**
  * Tracker that stores comparable value internally, this value can grow when 
{@link #update(Comparable)} method is called. The tracker gives
  * ability to wait for certain value, see {@link #waitFor(Comparable)}.
  */
-public class PendingComparableValuesTracker<T extends Comparable<T>> {
+public class PendingComparableValuesTracker<T extends Comparable<T>> 
implements ManuallyCloseable {
     private static final VarHandle CURRENT;
 
+    private static final VarHandle CLOSE_GUARD;
+
     static {
         try {
             CURRENT = 
MethodHandles.lookup().findVarHandle(PendingComparableValuesTracker.class, 
"current", Comparable.class);
+            CLOSE_GUARD = 
MethodHandles.lookup().findVarHandle(PendingComparableValuesTracker.class, 
"closeGuard", boolean.class);
         } catch (ReflectiveOperationException e) {
             throw new ExceptionInInitializerError(e);
         }
@@ -46,6 +51,13 @@ public class PendingComparableValuesTracker<T extends 
Comparable<T>> {
     /** Current value. */
     private volatile T current;
 
+    /** Prevents double closing. */
+    @SuppressWarnings("unused")
+    private volatile boolean closeGuard;
+
+    /** Busy lock to close synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
     /**
      * Constructor with initial value.
      *
@@ -60,23 +72,32 @@ public class PendingComparableValuesTracker<T extends 
Comparable<T>> {
      * that had been created for corresponding values that are lower than the 
given one.
      *
      * @param newValue New value.
+     * @throws TrackerClosedException if the tracker is closed.
      */
     public void update(T newValue) {
         while (true) {
-            T current = this.current;
-
-            if (newValue.compareTo(current) <= 0) {
-                break;
+            if (!busyLock.enterBusy()) {
+                throw new TrackerClosedException();
             }
 
-            if (CURRENT.compareAndSet(this, current, newValue)) {
-                ConcurrentNavigableMap<T, CompletableFuture<Void>> 
smallerFutures = valueFutures.headMap(newValue, true);
+            try {
+                T current = this.current;
+
+                if (newValue.compareTo(current) <= 0) {
+                    break;
+                }
+
+                if (CURRENT.compareAndSet(this, current, newValue)) {
+                    ConcurrentNavigableMap<T, CompletableFuture<Void>> 
smallerFutures = valueFutures.headMap(newValue, true);
 
-                smallerFutures.forEach((k, f) -> f.complete(null));
+                    smallerFutures.forEach((k, f) -> f.complete(null));
 
-                smallerFutures.clear();
+                    smallerFutures.clear();
 
-                break;
+                    break;
+                }
+            } finally {
+                busyLock.leaveBusy();
             }
         }
     }
@@ -85,31 +106,63 @@ public class PendingComparableValuesTracker<T extends 
Comparable<T>> {
      * Provides the future that is completed when this tracker's internal 
value reaches given one. If the internal value is greater or equal
      * then the given one, returns completed future.
      *
+     * <p>When the tracker is closed, the future will complete with an {@link 
TrackerClosedException}.
+     *
      * @param valueToWait Value to wait.
-     * @return Future.
      */
     public CompletableFuture<Void> waitFor(T valueToWait) {
-        if (current.compareTo(valueToWait) >= 0) {
-            return completedFuture(null);
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new TrackerClosedException());
         }
 
-        CompletableFuture<Void> future = 
valueFutures.computeIfAbsent(valueToWait, k -> new CompletableFuture<>());
+        try {
+            if (current.compareTo(valueToWait) >= 0) {
+                return completedFuture(null);
+            }
+
+            CompletableFuture<Void> future = 
valueFutures.computeIfAbsent(valueToWait, k -> new CompletableFuture<>());
 
-        if (current.compareTo(valueToWait) >= 0) {
-            future.complete(null);
+            if (current.compareTo(valueToWait) >= 0) {
+                future.complete(null);
 
-            valueFutures.remove(valueToWait);
-        }
+                valueFutures.remove(valueToWait);
+            }
 
-        return future;
+            return future;
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * Returns current internal value.
      *
-     * @return Current value.
+     * @throws TrackerClosedException if the tracker is closed.
      */
     public T current() {
-        return current;
+        if (!busyLock.enterBusy()) {
+            throw new TrackerClosedException();
+        }
+
+        try {
+            return current;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    @Override
+    public void close() {
+        if (!CLOSE_GUARD.compareAndSet(this, false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        TrackerClosedException trackerClosedException = new 
TrackerClosedException();
+
+        valueFutures.values().forEach(future -> 
future.completeExceptionally(trackerClosedException));
+
+        valueFutures.clear();
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/TrackerClosedException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/TrackerClosedException.java
new file mode 100644
index 0000000000..2ab0b0e2d0
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/TrackerClosedException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+/**
+ * Exception that will be thrown when the {@link 
PendingComparableValuesTracker} is closed.
+ */
+public class TrackerClosedException extends RuntimeException {
+    private static final long serialVersionUID = -3685913884384983930L;
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
index d30aeeb1ef..7dc8952746 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
@@ -18,9 +18,11 @@
 package org.apache.ignite.internal.util;
 
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreaded;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -151,4 +153,19 @@ public class PendingComparableValuesTrackerTest {
         assertThat(writerFuture, willCompleteSuccessfully());
         assertThat(readerFuture, willCompleteSuccessfully());
     }
+
+    @Test
+    void testClose() {
+        var tracker = new PendingComparableValuesTracker<>(1);
+
+        CompletableFuture<Void> future0 = tracker.waitFor(2);
+
+        tracker.close();
+
+        assertThrows(TrackerClosedException.class, tracker::current);
+        assertThrows(TrackerClosedException.class, () -> tracker.update(2));
+
+        assertThat(future0, willThrowFast(TrackerClosedException.class));
+        assertThat(tracker.waitFor(2), 
willThrowFast(TrackerClosedException.class));
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 8b97f5d05a..ffd324d4dd 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.LockException;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.tx.TransactionException;
@@ -447,4 +448,18 @@ public interface InternalTable extends ManuallyCloseable {
      */
     @Override
     void close();
+
+    /**
+     * Returns the partition safe time tracker, {@code null} means not added.
+     *
+     * @param partitionId Partition ID.
+     */
+    @Nullable PendingComparableValuesTracker<HybridTimestamp> 
getPartitionSafeTimeTracker(int partitionId);
+
+    /**
+     * Returns the partition storage index tracker, {@code null} means not 
added.
+     *
+     * @param partitionId Partition ID.
+     */
+    @Nullable PendingComparableValuesTracker<Long> 
getPartitionStorageIndexTracker(int partitionId);
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 8ea8ea8c3d..e5480730c3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -725,8 +725,10 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                 placementDriver.updateAssignment(replicaGrpId, 
newConfiguration.peers().stream().map(Peer::consistentId).collect(toList()));
 
-                PendingComparableValuesTracker<HybridTimestamp> safeTime = new 
PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
-                PendingComparableValuesTracker<Long> storageIndexTracker = new 
PendingComparableValuesTracker<>(0L);
+                var safeTimeTracker = new PendingComparableValuesTracker<>(new 
HybridTimestamp(1, 0));
+                var storageIndexTracker = new 
PendingComparableValuesTracker<>(0L);
+
+                ((InternalTableImpl) 
internalTbl).updatePartitionTrackers(partId, safeTimeTracker, 
storageIndexTracker);
 
                 CompletableFuture<PartitionStorages> partitionStoragesFut = 
getOrCreatePartitionStorages(table, partId);
 
@@ -814,7 +816,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                                             
partitionDataStorage,
                                                             
storageUpdateHandler,
                                                             
txStatePartitionStorage,
-                                                            safeTime,
+                                                            safeTimeTracker,
                                                             storageIndexTracker
                                                     ),
                                                     new 
RebalanceRaftGroupEventsListener(
@@ -878,7 +880,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                                             new Lazy<>(() -> 
table.indexStorageAdapters(partId).get().get(table.pkId())),
                                                             () -> 
table.indexStorageAdapters(partId).get(),
                                                             clock,
-                                                            safeTime,
+                                                            safeTimeTracker,
                                                             txStateStorage,
                                                             placementDriver,
                                                             
storageUpdateHandler,
@@ -1033,7 +1035,11 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
             AtomicBoolean nodeStoppingEx = new AtomicBoolean();
 
-            for (int p = 0; p < table.internalTable().partitions(); p++) {
+            InternalTable internalTable = table.internalTable();
+
+            for (int p = 0; p < internalTable.partitions(); p++) {
+                int partitionId = p;
+
                 TablePartitionId replicationGroupId = new 
TablePartitionId(table.tableId(), p);
 
                 stopping.add(() -> {
@@ -1054,6 +1060,14 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                 CompletableFuture<Void> removeFromGcFuture = 
mvGc.removeStorage(replicationGroupId);
 
+                stopping.add(() -> {
+                    try {
+                        closePartitionTrackers(internalTable, partitionId);
+                    } catch (Throwable t) {
+                        handleExceptionOnCleanUpTablesResources(t, throwable, 
nodeStoppingEx);
+                    }
+                });
+
                 stopping.add(() -> {
                     try {
                         // Should be done fairly quickly.
@@ -1068,9 +1082,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
             try {
                 IgniteUtils.closeAllManually(
-                        table.internalTable().storage(),
-                        table.internalTable().txStateStorage(),
-                        table.internalTable()
+                        internalTable.storage(),
+                        internalTable.txStateStorage(),
+                        internalTable
                 );
             } catch (Throwable t) {
                 handleExceptionOnCleanUpTablesResources(t, throwable, 
nodeStoppingEx);
@@ -1277,12 +1291,18 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 assert table != null : IgniteStringFormatter.format("There is 
no table with the name specified [name={}, id={}]",
                         name, tblId);
 
+                InternalTable internalTable = table.internalTable();
+
+                for (int partitionId = 0; partitionId < partitions; 
partitionId++) {
+                    closePartitionTrackers(internalTable, partitionId);
+                }
+
                 // TODO: IGNITE-18703 Destroy raft log and meta
 
                 CompletableFuture<Void> destroyTableStoragesFuture = 
allOf(removeStorageFromGcFutures)
                         .thenCompose(unused -> allOf(
-                                table.internalTable().storage().destroy(),
-                                runAsync(() -> 
table.internalTable().txStateStorage().destroy(), ioExecutor))
+                                internalTable.storage().destroy(),
+                                runAsync(() -> 
internalTable.txStateStorage().destroy(), ioExecutor))
                         );
 
                 CompletableFuture<?> dropSchemaRegistryFuture = 
schemaManager.dropRegistry(causalityToken, table.tableId());
@@ -2019,11 +2039,13 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 .filter(assignment -> 
localMember.name().equals(assignment.consistentId()))
                 .anyMatch(assignment -> 
!stableAssignments.contains(assignment));
 
-        var safeTime = new PendingComparableValuesTracker<>(new 
HybridTimestamp(1, 0));
-        PendingComparableValuesTracker<Long> storageIndexTracker = new 
PendingComparableValuesTracker<>(0L);
+        var safeTimeTracker = new PendingComparableValuesTracker<>(new 
HybridTimestamp(1, 0));
+        var storageIndexTracker = new PendingComparableValuesTracker<>(0L);
 
         InternalTable internalTable = tbl.internalTable();
 
+        ((InternalTableImpl) internalTable).updatePartitionTrackers(partId, 
safeTimeTracker, storageIndexTracker);
+
         LOG.info("Received update on pending assignments. Check if new raft 
group should be started"
                         + " [key={}, partition={}, table={}, 
localMemberAddress={}]",
                 pendingAssignmentsEntry.key(), partId, tbl.name(), 
localMember.address());
@@ -2055,7 +2077,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                 partitionDataStorage,
                                 storageUpdateHandler,
                                 txStatePartitionStorage,
-                                safeTime,
+                                safeTimeTracker,
                                 storageIndexTracker
                         );
 
@@ -2104,7 +2126,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                             new Lazy<>(() -> 
tbl.indexStorageAdapters(partId).get().get(tbl.pkId())),
                                             () -> 
tbl.indexStorageAdapters(partId).get(),
                                             clock,
-                                            safeTime,
+                                            safeTimeTracker,
                                             txStatePartitionStorage,
                                             placementDriver,
                                             storageUpdateHandler,
@@ -2345,6 +2367,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                             .thenCombine(mvGc.removeStorage(tablePartitionId), 
(tables, unused) -> {
                                 InternalTable internalTable = 
tables.get(tableId).internalTable();
 
+                                closePartitionTrackers(internalTable, 
partitionId);
+
                                 return allOf(
                                         
internalTable.storage().destroyPartition(partitionId),
                                         runAsync(() -> 
internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
@@ -2389,4 +2413,16 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
         return indexIds;
     }
+
+    private static void closePartitionTrackers(InternalTable internalTable, 
int partitionId) {
+        closeTracker(internalTable.getPartitionSafeTimeTracker(partitionId));
+
+        
closeTracker(internalTable.getPartitionStorageIndexTracker(partitionId));
+    }
+
+    private static void closeTracker(@Nullable 
PendingComparableValuesTracker<?> tracker) {
+        if (tracker != null) {
+            tracker.close();
+        }
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 57762c063a..d9285fa634 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.jetbrains.annotations.TestOnly;
 
@@ -195,10 +196,10 @@ public class PartitionListener implements 
RaftGroupListener {
 
                 assert safeTimePropagatingCommand.safeTime() != null;
 
-                
safeTime.update(safeTimePropagatingCommand.safeTime().asHybridTimestamp());
+                updateTrackerIgnoringTrackerClosedException(safeTime, 
safeTimePropagatingCommand.safeTime().asHybridTimestamp());
             }
 
-            storageIndexTracker.update(commandIndex);
+            updateTrackerIgnoringTrackerClosedException(storageIndexTracker, 
commandIndex);
         });
     }
 
@@ -465,4 +466,15 @@ public class PartitionListener implements 
RaftGroupListener {
             );
         }
     }
+
+    private static <T extends Comparable<T>> void 
updateTrackerIgnoringTrackerClosedException(
+            PendingComparableValuesTracker<T> tracker,
+            T newValue
+    ) {
+        try {
+            tracker.update(newValue);
+        } catch (TrackerClosedException ignored) {
+            // No-op.
+        }
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index deb430c1e8..8e1532ac3b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.storage;
 
+import static it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
@@ -78,6 +79,7 @@ import org.apache.ignite.internal.tx.LockException;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteException;
@@ -101,8 +103,8 @@ public class InternalTableImpl implements InternalTable {
     /** Number of attempts. */
     private static final int ATTEMPTS_TO_ENLIST_PARTITION = 5;
 
-    /** Partition map. */
-    protected volatile Int2ObjectMap<RaftGroupService> partitionMap;
+    /** Map update guarded by {@link #updatePartitionMapsMux}. */
+    protected volatile Int2ObjectMap<RaftGroupService> 
raftGroupServiceByPartitionId;
 
     /** Partitions. */
     private final int partitions;
@@ -128,8 +130,8 @@ public class InternalTableImpl implements InternalTable {
     /** Replica service. */
     private final ReplicaService replicaSvc;
 
-    /** Mutex for the partition map update. */
-    private final Object updatePartMapMux = new Object();
+    /** Mutex for the partition maps update. */
+    private final Object updatePartitionMapsMux = new Object();
 
     /** Table messages factory. */
     private final TableMessagesFactory tableMessagesFactory;
@@ -137,6 +139,12 @@ public class InternalTableImpl implements InternalTable {
     /** A hybrid logical clock. */
     private final HybridClock clock;
 
+    /** Map update guarded by {@link #updatePartitionMapsMux}. */
+    private volatile 
Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp>> 
safeTimeTrackerByPartitionId = emptyMap();
+
+    /** Map update guarded by {@link #updatePartitionMapsMux}. */
+    private volatile Int2ObjectMap<PendingComparableValuesTracker<Long>> 
storageIndexTrackerByPartitionId = emptyMap();
+
     /**
      * Constructor.
      *
@@ -164,7 +172,7 @@ public class InternalTableImpl implements InternalTable {
     ) {
         this.tableName = tableName;
         this.tableId = tableId;
-        this.partitionMap = partMap;
+        this.raftGroupServiceByPartitionId = partMap;
         this.partitions = partitions;
         this.clusterNodeResolver = clusterNodeResolver;
         this.txManager = txManager;
@@ -524,7 +532,7 @@ public class InternalTableImpl implements InternalTable {
             @NotNull ClusterNode recipientNode
     ) {
         int partId = partitionId(keyRow);
-        ReplicationGroupId partGroupId = partitionMap.get(partId).groupId();
+        ReplicationGroupId partGroupId = 
raftGroupServiceByPartitionId.get(partId).groupId();
 
         return replicaSvc.invoke(recipientNode, 
tableMessagesFactory.readOnlySingleRowReplicaRequest()
                 .groupId(partGroupId)
@@ -577,7 +585,7 @@ public class InternalTableImpl implements InternalTable {
         int batchNum = 0;
 
         for (Int2ObjectOpenHashMap.Entry<List<BinaryRow>> partToRows : 
keyRowsByPartition.int2ObjectEntrySet()) {
-            ReplicationGroupId partGroupId = 
partitionMap.get(partToRows.getIntKey()).groupId();
+            ReplicationGroupId partGroupId = 
raftGroupServiceByPartitionId.get(partToRows.getIntKey()).groupId();
 
             CompletableFuture<Object> fut = replicaSvc.invoke(recipientNode, 
tableMessagesFactory.readOnlyMultiRowReplicaRequest()
                     .groupId(partGroupId)
@@ -888,7 +896,7 @@ public class InternalTableImpl implements InternalTable {
 
         return new PartitionScanPublisher(
                 (scanId, batchSize) -> {
-                    ReplicationGroupId partGroupId = 
partitionMap.get(partId).groupId();
+                    ReplicationGroupId partGroupId = 
raftGroupServiceByPartitionId.get(partId).groupId();
 
                     ReadOnlyScanRetrieveBatchReplicaRequest request = 
tableMessagesFactory.readOnlyScanRetrieveBatchReplicaRequest()
                             .groupId(partGroupId)
@@ -999,7 +1007,7 @@ public class InternalTableImpl implements InternalTable {
     ) {
         return new PartitionScanPublisher(
                 (scanId, batchSize) -> {
-                    ReplicationGroupId partGroupId = 
partitionMap.get(partId).groupId();
+                    ReplicationGroupId partGroupId = 
raftGroupServiceByPartitionId.get(partId).groupId();
 
                     ReadWriteScanRetrieveBatchReplicaRequest request = 
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
                             .groupId(partGroupId)
@@ -1062,7 +1070,7 @@ public class InternalTableImpl implements InternalTable {
     public List<String> assignments() {
         awaitLeaderInitialization();
 
-        return partitionMap.int2ObjectEntrySet().stream()
+        return raftGroupServiceByPartitionId.int2ObjectEntrySet().stream()
                 
.sorted(Comparator.comparingInt(Int2ObjectOpenHashMap.Entry::getIntKey))
                 .map(Map.Entry::getValue)
                 .map(service -> service.leader().consistentId())
@@ -1072,7 +1080,7 @@ public class InternalTableImpl implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public List<PrimaryReplica> primaryReplicas() {
-        List<Entry<RaftGroupService>> entries = new 
ArrayList<>(partitionMap.int2ObjectEntrySet());
+        List<Entry<RaftGroupService>> entries = new 
ArrayList<>(raftGroupServiceByPartitionId.int2ObjectEntrySet());
         List<CompletableFuture<LeaderWithTerm>> futs = new ArrayList<>();
 
         entries.sort(Comparator.comparingInt(Entry::getIntKey));
@@ -1097,7 +1105,7 @@ public class InternalTableImpl implements InternalTable {
     public ClusterNode leaderAssignment(int partition) {
         awaitLeaderInitialization();
 
-        RaftGroupService raftGroupService = partitionMap.get(partition);
+        RaftGroupService raftGroupService = 
raftGroupServiceByPartitionId.get(partition);
         if (raftGroupService == null) {
             throw new IgniteInternalException("No such partition " + partition 
+ " in table " + tableName);
         }
@@ -1108,7 +1116,7 @@ public class InternalTableImpl implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public RaftGroupService partitionRaftGroupService(int partition) {
-        RaftGroupService raftGroupService = partitionMap.get(partition);
+        RaftGroupService raftGroupService = 
raftGroupServiceByPartitionId.get(partition);
         if (raftGroupService == null) {
             throw new IgniteInternalException("No such partition " + partition 
+ " in table " + tableName);
         }
@@ -1129,7 +1137,7 @@ public class InternalTableImpl implements InternalTable {
     private void awaitLeaderInitialization() {
         List<CompletableFuture<Void>> futs = new ArrayList<>();
 
-        for (RaftGroupService raftSvc : partitionMap.values()) {
+        for (RaftGroupService raftSvc : 
raftGroupServiceByPartitionId.values()) {
             if (raftSvc.leader() == null) {
                 futs.add(raftSvc.refreshLeader());
             }
@@ -1152,7 +1160,7 @@ public class InternalTableImpl implements InternalTable {
     public Map<Integer, List<String>> peersAndLearners() {
         awaitLeaderInitialization();
 
-        return partitionMap.int2ObjectEntrySet().stream()
+        return raftGroupServiceByPartitionId.int2ObjectEntrySet().stream()
                 .collect(Collectors.toMap(Entry::getIntKey, e -> {
                     RaftGroupService service = e.getValue();
                     return Stream.of(service.peers(), service.learners())
@@ -1201,14 +1209,14 @@ public class InternalTableImpl implements InternalTable 
{
     public void updateInternalTableRaftGroupService(int p, RaftGroupService 
raftGrpSvc) {
         RaftGroupService oldSrvc;
 
-        synchronized (updatePartMapMux) {
+        synchronized (updatePartitionMapsMux) {
             Int2ObjectMap<RaftGroupService> newPartitionMap = new 
Int2ObjectOpenHashMap<>(partitions);
 
-            newPartitionMap.putAll(partitionMap);
+            newPartitionMap.putAll(raftGroupServiceByPartitionId);
 
             oldSrvc = newPartitionMap.put(p, raftGrpSvc);
 
-            partitionMap = newPartitionMap;
+            raftGroupServiceByPartitionId = newPartitionMap;
         }
 
         if (oldSrvc != null) {
@@ -1224,7 +1232,7 @@ public class InternalTableImpl implements InternalTable {
      * @return The enlist future (then will a leader become known).
      */
     protected CompletableFuture<IgniteBiTuple<ClusterNode, Long>> enlist(int 
partId, InternalTransaction tx) {
-        RaftGroupService svc = partitionMap.get(partId);
+        RaftGroupService svc = raftGroupServiceByPartitionId.get(partId);
         tx.assignCommitPartition(new TablePartitionId(tableId, partId));
 
         // TODO: IGNITE-17256 Use a placement driver for getting a primary 
replica.
@@ -1415,7 +1423,7 @@ public class InternalTableImpl implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public void close() {
-        for (RaftGroupService srv : partitionMap.values()) {
+        for (RaftGroupService srv : raftGroupServiceByPartitionId.values()) {
             srv.shutdown();
         }
     }
@@ -1428,7 +1436,7 @@ public class InternalTableImpl implements InternalTable {
      * @return Cluster node to evalute read-only request.
      */
     protected CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int 
partId) {
-        RaftGroupService svc = partitionMap.get(partId);
+        RaftGroupService svc = raftGroupServiceByPartitionId.get(partId);
 
         return svc.refreshAndGetLeaderWithTerm().handle((res, e) -> {
             if (e != null) {
@@ -1470,4 +1478,52 @@ public class InternalTableImpl implements InternalTable {
 
         return e0;
     }
+
+    @Override
+    public @Nullable PendingComparableValuesTracker<HybridTimestamp> 
getPartitionSafeTimeTracker(int partitionId) {
+        return safeTimeTrackerByPartitionId.get(partitionId);
+    }
+
+    @Override
+    public @Nullable PendingComparableValuesTracker<Long> 
getPartitionStorageIndexTracker(int partitionId) {
+        return storageIndexTrackerByPartitionId.get(partitionId);
+    }
+
+    /**
+     * Updates the partition trackers, if there were previous ones, it closes 
them.
+     *
+     * @param partitionId Partition ID.
+     * @param newSafeTimeTracker New partition safe time tracker.
+     * @param newStorageIndexTracker New partition storage index tracker.
+     */
+    public void updatePartitionTrackers(
+            int partitionId,
+            PendingComparableValuesTracker<HybridTimestamp> newSafeTimeTracker,
+            PendingComparableValuesTracker<Long> newStorageIndexTracker
+    ) {
+        PendingComparableValuesTracker<HybridTimestamp> 
previousSafeTimeTracker;
+        PendingComparableValuesTracker<Long> previousStorageIndexTracker;
+
+        synchronized (updatePartitionMapsMux) {
+            Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp>> 
newSafeTimeTrackerMap = new Int2ObjectOpenHashMap<>(partitions);
+            Int2ObjectMap<PendingComparableValuesTracker<Long>> 
newStorageIndexTrackerMap = new Int2ObjectOpenHashMap<>(partitions);
+
+            newSafeTimeTrackerMap.putAll(safeTimeTrackerByPartitionId);
+            newStorageIndexTrackerMap.putAll(storageIndexTrackerByPartitionId);
+
+            previousSafeTimeTracker = newSafeTimeTrackerMap.put(partitionId, 
newSafeTimeTracker);
+            previousStorageIndexTracker = 
newStorageIndexTrackerMap.put(partitionId, newStorageIndexTracker);
+
+            safeTimeTrackerByPartitionId = newSafeTimeTrackerMap;
+            storageIndexTrackerByPartitionId = newStorageIndexTrackerMap;
+        }
+
+        if (previousSafeTimeTracker != null) {
+            previousSafeTimeTracker.close();
+        }
+
+        if (previousStorageIndexTracker != null) {
+            previousStorageIndexTracker.close();
+        }
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
new file mode 100644
index 0000000000..e709ab3c04
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.storage;
+
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.network.ClusterNode;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link InternalTableImpl} testing.
+ */
+public class InternalTableImplTest {
+    @Test
+    void testUpdatePartitionTrackers() {
+        InternalTableImpl internalTable = new InternalTableImpl(
+                "test",
+                UUID.randomUUID(),
+                Int2ObjectMaps.emptyMap(),
+                1,
+                s -> mock(ClusterNode.class),
+                mock(TxManager.class),
+                mock(MvTableStorage.class),
+                mock(TxStateTableStorage.class),
+                mock(ReplicaService.class),
+                mock(HybridClock.class)
+        );
+
+        // Let's check the empty table.
+        assertNull(internalTable.getPartitionSafeTimeTracker(0));
+        assertNull(internalTable.getPartitionStorageIndexTracker(0));
+
+        // Let's check the first insert.
+        PendingComparableValuesTracker<HybridTimestamp> safeTime0 = 
mock(PendingComparableValuesTracker.class);
+        PendingComparableValuesTracker<Long> storageIndex0 = 
mock(PendingComparableValuesTracker.class);
+
+        internalTable.updatePartitionTrackers(0, safeTime0, storageIndex0);
+
+        assertSame(safeTime0, internalTable.getPartitionSafeTimeTracker(0));
+        assertSame(storageIndex0, 
internalTable.getPartitionStorageIndexTracker(0));
+
+        verify(safeTime0, never()).close();
+        verify(storageIndex0, never()).close();
+
+        // Let's check the new insert.
+        PendingComparableValuesTracker<HybridTimestamp> safeTime1 = 
mock(PendingComparableValuesTracker.class);
+        PendingComparableValuesTracker<Long> storageIndex1 = 
mock(PendingComparableValuesTracker.class);
+
+        internalTable.updatePartitionTrackers(0, safeTime1, storageIndex1);
+
+        assertSame(safeTime1, internalTable.getPartitionSafeTimeTracker(0));
+        assertSame(storageIndex1, 
internalTable.getPartitionStorageIndexTracker(0));
+
+        verify(safeTime0).close();
+        verify(storageIndex0).close();
+    }
+}
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 0d607f4f65..6c482ade7b 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -188,7 +188,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 replicaSvc,
                 CLOCK
         );
-        RaftGroupService svc = partitionMap.get(0);
+        RaftGroupService svc = raftGroupServiceByPartitionId.get(0);
 
         groupId = crossTableUsage ? new TablePartitionId(tableId(), PART_ID) : 
crossTableGroupId;
 
@@ -277,7 +277,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
 
         replicaListener = new PartitionReplicaListener(
                 mvPartStorage,
-                partitionMap.get(PART_ID),
+                raftGroupServiceByPartitionId.get(PART_ID),
                 this.txManager,
                 this.txManager.lockManager(),
                 Runnable::run,


Reply via email to