This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 6f4458ab8c IGNITE-19326 Close partition safe time trackers (#1960) 6f4458ab8c is described below commit 6f4458ab8c99a10ef9669be48e88379bb656bfd4 Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Thu Apr 20 19:36:01 2023 +0300 IGNITE-19326 Close partition safe time trackers (#1960) --- .../ignite/client/fakes/FakeInternalTable.java | 11 +++ .../util/PendingComparableValuesTracker.java | 95 ++++++++++++++++----- .../internal/util/TrackerClosedException.java | 25 ++++++ .../util/PendingComparableValuesTrackerTest.java | 17 ++++ .../ignite/internal/table/InternalTable.java | 15 ++++ .../internal/table/distributed/TableManager.java | 64 ++++++++++---- .../table/distributed/raft/PartitionListener.java | 16 +++- .../distributed/storage/InternalTableImpl.java | 98 +++++++++++++++++----- .../distributed/storage/InternalTableImplTest.java | 85 +++++++++++++++++++ .../table/impl/DummyInternalTableImpl.java | 4 +- 10 files changed, 370 insertions(+), 60 deletions(-) diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java index ba232b6580..393ce89cfb 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.network.ClusterNode; @@ -466,4 +467,14 @@ public class FakeInternalTable implements InternalTable { dataAccessListener.accept(operation, arg); } } + + @Override + public @Nullable PendingComparableValuesTracker<HybridTimestamp> getPartitionSafeTimeTracker(int partitionId) { + return null; + } + + @Override + public @Nullable PendingComparableValuesTracker<Long> getPartitionStorageIndexTracker(int partitionId) { + return null; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java index 545e8d0d09..8e4fa4a55e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java @@ -18,23 +18,28 @@ package org.apache.ignite.internal.util; import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import org.apache.ignite.internal.close.ManuallyCloseable; /** * Tracker that stores comparable value internally, this value can grow when {@link #update(Comparable)} method is called. The tracker gives * ability to wait for certain value, see {@link #waitFor(Comparable)}. */ -public class PendingComparableValuesTracker<T extends Comparable<T>> { +public class PendingComparableValuesTracker<T extends Comparable<T>> implements ManuallyCloseable { private static final VarHandle CURRENT; + private static final VarHandle CLOSE_GUARD; + static { try { CURRENT = MethodHandles.lookup().findVarHandle(PendingComparableValuesTracker.class, "current", Comparable.class); + CLOSE_GUARD = MethodHandles.lookup().findVarHandle(PendingComparableValuesTracker.class, "closeGuard", boolean.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } @@ -46,6 +51,13 @@ public class PendingComparableValuesTracker<T extends Comparable<T>> { /** Current value. */ private volatile T current; + /** Prevents double closing. */ + @SuppressWarnings("unused") + private volatile boolean closeGuard; + + /** Busy lock to close synchronously. */ + private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); + /** * Constructor with initial value. * @@ -60,23 +72,32 @@ public class PendingComparableValuesTracker<T extends Comparable<T>> { * that had been created for corresponding values that are lower than the given one. * * @param newValue New value. + * @throws TrackerClosedException if the tracker is closed. */ public void update(T newValue) { while (true) { - T current = this.current; - - if (newValue.compareTo(current) <= 0) { - break; + if (!busyLock.enterBusy()) { + throw new TrackerClosedException(); } - if (CURRENT.compareAndSet(this, current, newValue)) { - ConcurrentNavigableMap<T, CompletableFuture<Void>> smallerFutures = valueFutures.headMap(newValue, true); + try { + T current = this.current; + + if (newValue.compareTo(current) <= 0) { + break; + } + + if (CURRENT.compareAndSet(this, current, newValue)) { + ConcurrentNavigableMap<T, CompletableFuture<Void>> smallerFutures = valueFutures.headMap(newValue, true); - smallerFutures.forEach((k, f) -> f.complete(null)); + smallerFutures.forEach((k, f) -> f.complete(null)); - smallerFutures.clear(); + smallerFutures.clear(); - break; + break; + } + } finally { + busyLock.leaveBusy(); } } } @@ -85,31 +106,63 @@ public class PendingComparableValuesTracker<T extends Comparable<T>> { * Provides the future that is completed when this tracker's internal value reaches given one. If the internal value is greater or equal * then the given one, returns completed future. * + * <p>When the tracker is closed, the future will complete with an {@link TrackerClosedException}. + * * @param valueToWait Value to wait. - * @return Future. */ public CompletableFuture<Void> waitFor(T valueToWait) { - if (current.compareTo(valueToWait) >= 0) { - return completedFuture(null); + if (!busyLock.enterBusy()) { + return failedFuture(new TrackerClosedException()); } - CompletableFuture<Void> future = valueFutures.computeIfAbsent(valueToWait, k -> new CompletableFuture<>()); + try { + if (current.compareTo(valueToWait) >= 0) { + return completedFuture(null); + } + + CompletableFuture<Void> future = valueFutures.computeIfAbsent(valueToWait, k -> new CompletableFuture<>()); - if (current.compareTo(valueToWait) >= 0) { - future.complete(null); + if (current.compareTo(valueToWait) >= 0) { + future.complete(null); - valueFutures.remove(valueToWait); - } + valueFutures.remove(valueToWait); + } - return future; + return future; + } finally { + busyLock.leaveBusy(); + } } /** * Returns current internal value. * - * @return Current value. + * @throws TrackerClosedException if the tracker is closed. */ public T current() { - return current; + if (!busyLock.enterBusy()) { + throw new TrackerClosedException(); + } + + try { + return current; + } finally { + busyLock.leaveBusy(); + } + } + + @Override + public void close() { + if (!CLOSE_GUARD.compareAndSet(this, false, true)) { + return; + } + + busyLock.block(); + + TrackerClosedException trackerClosedException = new TrackerClosedException(); + + valueFutures.values().forEach(future -> future.completeExceptionally(trackerClosedException)); + + valueFutures.clear(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/TrackerClosedException.java b/modules/core/src/main/java/org/apache/ignite/internal/util/TrackerClosedException.java new file mode 100644 index 0000000000..2ab0b0e2d0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/TrackerClosedException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +/** + * Exception that will be thrown when the {@link PendingComparableValuesTracker} is closed. + */ +public class TrackerClosedException extends RuntimeException { + private static final long serialVersionUID = -3685913884384983930L; +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java index d30aeeb1ef..7dc8952746 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java @@ -18,9 +18,11 @@ package org.apache.ignite.internal.util; import static org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreaded; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.ArrayList; import java.util.Collections; @@ -151,4 +153,19 @@ public class PendingComparableValuesTrackerTest { assertThat(writerFuture, willCompleteSuccessfully()); assertThat(readerFuture, willCompleteSuccessfully()); } + + @Test + void testClose() { + var tracker = new PendingComparableValuesTracker<>(1); + + CompletableFuture<Void> future0 = tracker.waitFor(2); + + tracker.close(); + + assertThrows(TrackerClosedException.class, tracker::current); + assertThrows(TrackerClosedException.class, () -> tracker.update(2)); + + assertThat(future0, willThrowFast(TrackerClosedException.class)); + assertThat(tracker.waitFor(2), willThrowFast(TrackerClosedException.class)); + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java index 8b97f5d05a..ffd324d4dd 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.LockException; import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.tx.TransactionException; @@ -447,4 +448,18 @@ public interface InternalTable extends ManuallyCloseable { */ @Override void close(); + + /** + * Returns the partition safe time tracker, {@code null} means not added. + * + * @param partitionId Partition ID. + */ + @Nullable PendingComparableValuesTracker<HybridTimestamp> getPartitionSafeTimeTracker(int partitionId); + + /** + * Returns the partition storage index tracker, {@code null} means not added. + * + * @param partitionId Partition ID. + */ + @Nullable PendingComparableValuesTracker<Long> getPartitionStorageIndexTracker(int partitionId); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 8ea8ea8c3d..e5480730c3 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -725,8 +725,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp placementDriver.updateAssignment(replicaGrpId, newConfiguration.peers().stream().map(Peer::consistentId).collect(toList())); - PendingComparableValuesTracker<HybridTimestamp> safeTime = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0)); - PendingComparableValuesTracker<Long> storageIndexTracker = new PendingComparableValuesTracker<>(0L); + var safeTimeTracker = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0)); + var storageIndexTracker = new PendingComparableValuesTracker<>(0L); + + ((InternalTableImpl) internalTbl).updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker); CompletableFuture<PartitionStorages> partitionStoragesFut = getOrCreatePartitionStorages(table, partId); @@ -814,7 +816,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp partitionDataStorage, storageUpdateHandler, txStatePartitionStorage, - safeTime, + safeTimeTracker, storageIndexTracker ), new RebalanceRaftGroupEventsListener( @@ -878,7 +880,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp new Lazy<>(() -> table.indexStorageAdapters(partId).get().get(table.pkId())), () -> table.indexStorageAdapters(partId).get(), clock, - safeTime, + safeTimeTracker, txStateStorage, placementDriver, storageUpdateHandler, @@ -1033,7 +1035,11 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp AtomicBoolean nodeStoppingEx = new AtomicBoolean(); - for (int p = 0; p < table.internalTable().partitions(); p++) { + InternalTable internalTable = table.internalTable(); + + for (int p = 0; p < internalTable.partitions(); p++) { + int partitionId = p; + TablePartitionId replicationGroupId = new TablePartitionId(table.tableId(), p); stopping.add(() -> { @@ -1054,6 +1060,14 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp CompletableFuture<Void> removeFromGcFuture = mvGc.removeStorage(replicationGroupId); + stopping.add(() -> { + try { + closePartitionTrackers(internalTable, partitionId); + } catch (Throwable t) { + handleExceptionOnCleanUpTablesResources(t, throwable, nodeStoppingEx); + } + }); + stopping.add(() -> { try { // Should be done fairly quickly. @@ -1068,9 +1082,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp try { IgniteUtils.closeAllManually( - table.internalTable().storage(), - table.internalTable().txStateStorage(), - table.internalTable() + internalTable.storage(), + internalTable.txStateStorage(), + internalTable ); } catch (Throwable t) { handleExceptionOnCleanUpTablesResources(t, throwable, nodeStoppingEx); @@ -1277,12 +1291,18 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp assert table != null : IgniteStringFormatter.format("There is no table with the name specified [name={}, id={}]", name, tblId); + InternalTable internalTable = table.internalTable(); + + for (int partitionId = 0; partitionId < partitions; partitionId++) { + closePartitionTrackers(internalTable, partitionId); + } + // TODO: IGNITE-18703 Destroy raft log and meta CompletableFuture<Void> destroyTableStoragesFuture = allOf(removeStorageFromGcFutures) .thenCompose(unused -> allOf( - table.internalTable().storage().destroy(), - runAsync(() -> table.internalTable().txStateStorage().destroy(), ioExecutor)) + internalTable.storage().destroy(), + runAsync(() -> internalTable.txStateStorage().destroy(), ioExecutor)) ); CompletableFuture<?> dropSchemaRegistryFuture = schemaManager.dropRegistry(causalityToken, table.tableId()); @@ -2019,11 +2039,13 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp .filter(assignment -> localMember.name().equals(assignment.consistentId())) .anyMatch(assignment -> !stableAssignments.contains(assignment)); - var safeTime = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0)); - PendingComparableValuesTracker<Long> storageIndexTracker = new PendingComparableValuesTracker<>(0L); + var safeTimeTracker = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0)); + var storageIndexTracker = new PendingComparableValuesTracker<>(0L); InternalTable internalTable = tbl.internalTable(); + ((InternalTableImpl) internalTable).updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker); + LOG.info("Received update on pending assignments. Check if new raft group should be started" + " [key={}, partition={}, table={}, localMemberAddress={}]", pendingAssignmentsEntry.key(), partId, tbl.name(), localMember.address()); @@ -2055,7 +2077,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp partitionDataStorage, storageUpdateHandler, txStatePartitionStorage, - safeTime, + safeTimeTracker, storageIndexTracker ); @@ -2104,7 +2126,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp new Lazy<>(() -> tbl.indexStorageAdapters(partId).get().get(tbl.pkId())), () -> tbl.indexStorageAdapters(partId).get(), clock, - safeTime, + safeTimeTracker, txStatePartitionStorage, placementDriver, storageUpdateHandler, @@ -2345,6 +2367,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp .thenCombine(mvGc.removeStorage(tablePartitionId), (tables, unused) -> { InternalTable internalTable = tables.get(tableId).internalTable(); + closePartitionTrackers(internalTable, partitionId); + return allOf( internalTable.storage().destroyPartition(partitionId), runAsync(() -> internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor) @@ -2389,4 +2413,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp return indexIds; } + + private static void closePartitionTrackers(InternalTable internalTable, int partitionId) { + closeTracker(internalTable.getPartitionSafeTimeTracker(partitionId)); + + closeTracker(internalTable.getPartitionStorageIndexTracker(partitionId)); + } + + private static void closeTracker(@Nullable PendingComparableValuesTracker<?> tracker) { + if (tracker != null) { + tracker.close(); + } + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 57762c063a..d9285fa634 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -59,6 +59,7 @@ import org.apache.ignite.internal.tx.TxMeta; import org.apache.ignite.internal.tx.TxState; import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.apache.ignite.internal.util.TrackerClosedException; import org.apache.ignite.lang.IgniteInternalException; import org.jetbrains.annotations.TestOnly; @@ -195,10 +196,10 @@ public class PartitionListener implements RaftGroupListener { assert safeTimePropagatingCommand.safeTime() != null; - safeTime.update(safeTimePropagatingCommand.safeTime().asHybridTimestamp()); + updateTrackerIgnoringTrackerClosedException(safeTime, safeTimePropagatingCommand.safeTime().asHybridTimestamp()); } - storageIndexTracker.update(commandIndex); + updateTrackerIgnoringTrackerClosedException(storageIndexTracker, commandIndex); }); } @@ -465,4 +466,15 @@ public class PartitionListener implements RaftGroupListener { ); } } + + private static <T extends Comparable<T>> void updateTrackerIgnoringTrackerClosedException( + PendingComparableValuesTracker<T> tracker, + T newValue + ) { + try { + tracker.update(newValue); + } catch (TrackerClosedException ignored) { + // No-op. + } + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index deb430c1e8..8e1532ac3b 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.table.distributed.storage; +import static it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.util.ExceptionUtils.withCause; @@ -78,6 +79,7 @@ import org.apache.ignite.internal.tx.LockException; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteException; @@ -101,8 +103,8 @@ public class InternalTableImpl implements InternalTable { /** Number of attempts. */ private static final int ATTEMPTS_TO_ENLIST_PARTITION = 5; - /** Partition map. */ - protected volatile Int2ObjectMap<RaftGroupService> partitionMap; + /** Map update guarded by {@link #updatePartitionMapsMux}. */ + protected volatile Int2ObjectMap<RaftGroupService> raftGroupServiceByPartitionId; /** Partitions. */ private final int partitions; @@ -128,8 +130,8 @@ public class InternalTableImpl implements InternalTable { /** Replica service. */ private final ReplicaService replicaSvc; - /** Mutex for the partition map update. */ - private final Object updatePartMapMux = new Object(); + /** Mutex for the partition maps update. */ + private final Object updatePartitionMapsMux = new Object(); /** Table messages factory. */ private final TableMessagesFactory tableMessagesFactory; @@ -137,6 +139,12 @@ public class InternalTableImpl implements InternalTable { /** A hybrid logical clock. */ private final HybridClock clock; + /** Map update guarded by {@link #updatePartitionMapsMux}. */ + private volatile Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp>> safeTimeTrackerByPartitionId = emptyMap(); + + /** Map update guarded by {@link #updatePartitionMapsMux}. */ + private volatile Int2ObjectMap<PendingComparableValuesTracker<Long>> storageIndexTrackerByPartitionId = emptyMap(); + /** * Constructor. * @@ -164,7 +172,7 @@ public class InternalTableImpl implements InternalTable { ) { this.tableName = tableName; this.tableId = tableId; - this.partitionMap = partMap; + this.raftGroupServiceByPartitionId = partMap; this.partitions = partitions; this.clusterNodeResolver = clusterNodeResolver; this.txManager = txManager; @@ -524,7 +532,7 @@ public class InternalTableImpl implements InternalTable { @NotNull ClusterNode recipientNode ) { int partId = partitionId(keyRow); - ReplicationGroupId partGroupId = partitionMap.get(partId).groupId(); + ReplicationGroupId partGroupId = raftGroupServiceByPartitionId.get(partId).groupId(); return replicaSvc.invoke(recipientNode, tableMessagesFactory.readOnlySingleRowReplicaRequest() .groupId(partGroupId) @@ -577,7 +585,7 @@ public class InternalTableImpl implements InternalTable { int batchNum = 0; for (Int2ObjectOpenHashMap.Entry<List<BinaryRow>> partToRows : keyRowsByPartition.int2ObjectEntrySet()) { - ReplicationGroupId partGroupId = partitionMap.get(partToRows.getIntKey()).groupId(); + ReplicationGroupId partGroupId = raftGroupServiceByPartitionId.get(partToRows.getIntKey()).groupId(); CompletableFuture<Object> fut = replicaSvc.invoke(recipientNode, tableMessagesFactory.readOnlyMultiRowReplicaRequest() .groupId(partGroupId) @@ -888,7 +896,7 @@ public class InternalTableImpl implements InternalTable { return new PartitionScanPublisher( (scanId, batchSize) -> { - ReplicationGroupId partGroupId = partitionMap.get(partId).groupId(); + ReplicationGroupId partGroupId = raftGroupServiceByPartitionId.get(partId).groupId(); ReadOnlyScanRetrieveBatchReplicaRequest request = tableMessagesFactory.readOnlyScanRetrieveBatchReplicaRequest() .groupId(partGroupId) @@ -999,7 +1007,7 @@ public class InternalTableImpl implements InternalTable { ) { return new PartitionScanPublisher( (scanId, batchSize) -> { - ReplicationGroupId partGroupId = partitionMap.get(partId).groupId(); + ReplicationGroupId partGroupId = raftGroupServiceByPartitionId.get(partId).groupId(); ReadWriteScanRetrieveBatchReplicaRequest request = tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest() .groupId(partGroupId) @@ -1062,7 +1070,7 @@ public class InternalTableImpl implements InternalTable { public List<String> assignments() { awaitLeaderInitialization(); - return partitionMap.int2ObjectEntrySet().stream() + return raftGroupServiceByPartitionId.int2ObjectEntrySet().stream() .sorted(Comparator.comparingInt(Int2ObjectOpenHashMap.Entry::getIntKey)) .map(Map.Entry::getValue) .map(service -> service.leader().consistentId()) @@ -1072,7 +1080,7 @@ public class InternalTableImpl implements InternalTable { /** {@inheritDoc} */ @Override public List<PrimaryReplica> primaryReplicas() { - List<Entry<RaftGroupService>> entries = new ArrayList<>(partitionMap.int2ObjectEntrySet()); + List<Entry<RaftGroupService>> entries = new ArrayList<>(raftGroupServiceByPartitionId.int2ObjectEntrySet()); List<CompletableFuture<LeaderWithTerm>> futs = new ArrayList<>(); entries.sort(Comparator.comparingInt(Entry::getIntKey)); @@ -1097,7 +1105,7 @@ public class InternalTableImpl implements InternalTable { public ClusterNode leaderAssignment(int partition) { awaitLeaderInitialization(); - RaftGroupService raftGroupService = partitionMap.get(partition); + RaftGroupService raftGroupService = raftGroupServiceByPartitionId.get(partition); if (raftGroupService == null) { throw new IgniteInternalException("No such partition " + partition + " in table " + tableName); } @@ -1108,7 +1116,7 @@ public class InternalTableImpl implements InternalTable { /** {@inheritDoc} */ @Override public RaftGroupService partitionRaftGroupService(int partition) { - RaftGroupService raftGroupService = partitionMap.get(partition); + RaftGroupService raftGroupService = raftGroupServiceByPartitionId.get(partition); if (raftGroupService == null) { throw new IgniteInternalException("No such partition " + partition + " in table " + tableName); } @@ -1129,7 +1137,7 @@ public class InternalTableImpl implements InternalTable { private void awaitLeaderInitialization() { List<CompletableFuture<Void>> futs = new ArrayList<>(); - for (RaftGroupService raftSvc : partitionMap.values()) { + for (RaftGroupService raftSvc : raftGroupServiceByPartitionId.values()) { if (raftSvc.leader() == null) { futs.add(raftSvc.refreshLeader()); } @@ -1152,7 +1160,7 @@ public class InternalTableImpl implements InternalTable { public Map<Integer, List<String>> peersAndLearners() { awaitLeaderInitialization(); - return partitionMap.int2ObjectEntrySet().stream() + return raftGroupServiceByPartitionId.int2ObjectEntrySet().stream() .collect(Collectors.toMap(Entry::getIntKey, e -> { RaftGroupService service = e.getValue(); return Stream.of(service.peers(), service.learners()) @@ -1201,14 +1209,14 @@ public class InternalTableImpl implements InternalTable { public void updateInternalTableRaftGroupService(int p, RaftGroupService raftGrpSvc) { RaftGroupService oldSrvc; - synchronized (updatePartMapMux) { + synchronized (updatePartitionMapsMux) { Int2ObjectMap<RaftGroupService> newPartitionMap = new Int2ObjectOpenHashMap<>(partitions); - newPartitionMap.putAll(partitionMap); + newPartitionMap.putAll(raftGroupServiceByPartitionId); oldSrvc = newPartitionMap.put(p, raftGrpSvc); - partitionMap = newPartitionMap; + raftGroupServiceByPartitionId = newPartitionMap; } if (oldSrvc != null) { @@ -1224,7 +1232,7 @@ public class InternalTableImpl implements InternalTable { * @return The enlist future (then will a leader become known). */ protected CompletableFuture<IgniteBiTuple<ClusterNode, Long>> enlist(int partId, InternalTransaction tx) { - RaftGroupService svc = partitionMap.get(partId); + RaftGroupService svc = raftGroupServiceByPartitionId.get(partId); tx.assignCommitPartition(new TablePartitionId(tableId, partId)); // TODO: IGNITE-17256 Use a placement driver for getting a primary replica. @@ -1415,7 +1423,7 @@ public class InternalTableImpl implements InternalTable { /** {@inheritDoc} */ @Override public void close() { - for (RaftGroupService srv : partitionMap.values()) { + for (RaftGroupService srv : raftGroupServiceByPartitionId.values()) { srv.shutdown(); } } @@ -1428,7 +1436,7 @@ public class InternalTableImpl implements InternalTable { * @return Cluster node to evalute read-only request. */ protected CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int partId) { - RaftGroupService svc = partitionMap.get(partId); + RaftGroupService svc = raftGroupServiceByPartitionId.get(partId); return svc.refreshAndGetLeaderWithTerm().handle((res, e) -> { if (e != null) { @@ -1470,4 +1478,52 @@ public class InternalTableImpl implements InternalTable { return e0; } + + @Override + public @Nullable PendingComparableValuesTracker<HybridTimestamp> getPartitionSafeTimeTracker(int partitionId) { + return safeTimeTrackerByPartitionId.get(partitionId); + } + + @Override + public @Nullable PendingComparableValuesTracker<Long> getPartitionStorageIndexTracker(int partitionId) { + return storageIndexTrackerByPartitionId.get(partitionId); + } + + /** + * Updates the partition trackers, if there were previous ones, it closes them. + * + * @param partitionId Partition ID. + * @param newSafeTimeTracker New partition safe time tracker. + * @param newStorageIndexTracker New partition storage index tracker. + */ + public void updatePartitionTrackers( + int partitionId, + PendingComparableValuesTracker<HybridTimestamp> newSafeTimeTracker, + PendingComparableValuesTracker<Long> newStorageIndexTracker + ) { + PendingComparableValuesTracker<HybridTimestamp> previousSafeTimeTracker; + PendingComparableValuesTracker<Long> previousStorageIndexTracker; + + synchronized (updatePartitionMapsMux) { + Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp>> newSafeTimeTrackerMap = new Int2ObjectOpenHashMap<>(partitions); + Int2ObjectMap<PendingComparableValuesTracker<Long>> newStorageIndexTrackerMap = new Int2ObjectOpenHashMap<>(partitions); + + newSafeTimeTrackerMap.putAll(safeTimeTrackerByPartitionId); + newStorageIndexTrackerMap.putAll(storageIndexTrackerByPartitionId); + + previousSafeTimeTracker = newSafeTimeTrackerMap.put(partitionId, newSafeTimeTracker); + previousStorageIndexTracker = newStorageIndexTrackerMap.put(partitionId, newStorageIndexTracker); + + safeTimeTrackerByPartitionId = newSafeTimeTrackerMap; + storageIndexTrackerByPartitionId = newStorageIndexTrackerMap; + } + + if (previousSafeTimeTracker != null) { + previousSafeTimeTracker.close(); + } + + if (previousStorageIndexTracker != null) { + previousStorageIndexTracker.close(); + } + } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java new file mode 100644 index 0000000000..e709ab3c04 --- /dev/null +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.storage; + +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; +import java.util.UUID; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.storage.engine.MvTableStorage; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.apache.ignite.network.ClusterNode; +import org.junit.jupiter.api.Test; + +/** + * For {@link InternalTableImpl} testing. + */ +public class InternalTableImplTest { + @Test + void testUpdatePartitionTrackers() { + InternalTableImpl internalTable = new InternalTableImpl( + "test", + UUID.randomUUID(), + Int2ObjectMaps.emptyMap(), + 1, + s -> mock(ClusterNode.class), + mock(TxManager.class), + mock(MvTableStorage.class), + mock(TxStateTableStorage.class), + mock(ReplicaService.class), + mock(HybridClock.class) + ); + + // Let's check the empty table. + assertNull(internalTable.getPartitionSafeTimeTracker(0)); + assertNull(internalTable.getPartitionStorageIndexTracker(0)); + + // Let's check the first insert. + PendingComparableValuesTracker<HybridTimestamp> safeTime0 = mock(PendingComparableValuesTracker.class); + PendingComparableValuesTracker<Long> storageIndex0 = mock(PendingComparableValuesTracker.class); + + internalTable.updatePartitionTrackers(0, safeTime0, storageIndex0); + + assertSame(safeTime0, internalTable.getPartitionSafeTimeTracker(0)); + assertSame(storageIndex0, internalTable.getPartitionStorageIndexTracker(0)); + + verify(safeTime0, never()).close(); + verify(storageIndex0, never()).close(); + + // Let's check the new insert. + PendingComparableValuesTracker<HybridTimestamp> safeTime1 = mock(PendingComparableValuesTracker.class); + PendingComparableValuesTracker<Long> storageIndex1 = mock(PendingComparableValuesTracker.class); + + internalTable.updatePartitionTrackers(0, safeTime1, storageIndex1); + + assertSame(safeTime1, internalTable.getPartitionSafeTimeTracker(0)); + assertSame(storageIndex1, internalTable.getPartitionStorageIndexTracker(0)); + + verify(safeTime0).close(); + verify(storageIndex0).close(); + } +} diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index 0d607f4f65..6c482ade7b 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -188,7 +188,7 @@ public class DummyInternalTableImpl extends InternalTableImpl { replicaSvc, CLOCK ); - RaftGroupService svc = partitionMap.get(0); + RaftGroupService svc = raftGroupServiceByPartitionId.get(0); groupId = crossTableUsage ? new TablePartitionId(tableId(), PART_ID) : crossTableGroupId; @@ -277,7 +277,7 @@ public class DummyInternalTableImpl extends InternalTableImpl { replicaListener = new PartitionReplicaListener( mvPartStorage, - partitionMap.get(PART_ID), + raftGroupServiceByPartitionId.get(PART_ID), this.txManager, this.txManager.lockManager(), Runnable::run,