This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 16c9ae2ca0e IGNITE-28008 Extract TableZoneCoordinator out of
TableManager (#7805)
16c9ae2ca0e is described below
commit 16c9ae2ca0ec6f9d03c94f08f727cc4b5bba3dba
Author: Viacheslav Blinov <[email protected]>
AuthorDate: Mon Mar 23 12:22:41 2026 +0300
IGNITE-28008 Extract TableZoneCoordinator out of TableManager (#7805)
---
.../internal/table/distributed/TableManager.java | 761 ++-----------------
.../internal/table/distributed/TableRegistry.java | 54 ++
.../table/distributed/TableZoneCoordinator.java | 818 +++++++++++++++++++++
3 files changed, 924 insertions(+), 709 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index df6bee33931..104b7b1f09f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -17,23 +17,15 @@
package org.apache.ignite.internal.table.distributed;
-import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableMap;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.anyOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.concurrent.CompletableFuture.runAsync;
-import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.function.Function.identity;
-import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
import static org.apache.ignite.internal.event.EventListener.fromConsumer;
-import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED;
-import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED;
-import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED;
import static
org.apache.ignite.internal.table.distributed.TableUtils.aliveTables;
-import static
org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexesToTable;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
import static org.apache.ignite.internal.util.CollectionUtils.difference;
@@ -43,33 +35,26 @@ import static
org.apache.ignite.internal.util.CompletableFutures.emptyListComple
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
-import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
-import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
@@ -85,7 +70,6 @@ import
org.apache.ignite.internal.catalog.events.DropTableEventParameters;
import org.apache.ignite.internal.catalog.events.RenameTableEventParameters;
import org.apache.ignite.internal.causality.CompletionListener;
import org.apache.ignite.internal.causality.IncrementalVersionedValue;
-import org.apache.ignite.internal.causality.OutdatedTokenException;
import org.apache.ignite.internal.causality.RevisionListenerRegistry;
import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
import
org.apache.ignite.internal.configuration.utils.SystemDistributedConfigurationPropertyHolder;
@@ -111,32 +95,19 @@ import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.TopologyService;
-import
org.apache.ignite.internal.partition.replicator.LocalBeforeReplicaStartEventParameters;
-import
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEventParameters;
-import org.apache.ignite.internal.partition.replicator.NaiveAsyncReadWriteLock;
import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
-import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager.TablePartitionReplicaProcessorFactory;
-import
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
-import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
-import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
-import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import
org.apache.ignite.internal.partition.replicator.schema.ExecutorInclinedSchemaSyncService;
import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import
org.apache.ignite.internal.placementdriver.wrappers.ExecutorInclinedPlacementDriver;
import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
import org.apache.ignite.internal.storage.DataStorageManager;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.StorageClosedException;
-import org.apache.ignite.internal.storage.StorageDestroyedException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageEngine;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
@@ -149,7 +120,6 @@ import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.gc.MvGc;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
@@ -163,7 +133,6 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
-import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.LongPriorityQueue;
@@ -221,19 +190,12 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
*/
private final IncrementalVersionedValue<Void> assignmentsUpdatedVv;
- /** Registered tables. */
- private final Map<Integer, TableViewInternal> tables = new
ConcurrentHashMap<>();
-
- /** Started tables. */
- private final Map<Integer, TableViewInternal> startedTables = new
ConcurrentHashMap<>();
+ private final TableRegistry tableRegistry = new TableRegistry();
/** A queue for deferred table destruction events. */
private final LongPriorityQueue<DestroyTableEvent> destructionEventsQueue =
new LongPriorityQueue<>(DestroyTableEvent::catalogVersion);
- /** Local partitions. */
- private final Map<Integer, PartitionSet> localPartsByTableId = new
ConcurrentHashMap<>();
-
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -261,8 +223,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final FailureProcessor failureProcessor;
- private final MvGc mvGc;
-
private final LowWatermark lowWatermark;
private final HybridTimestampTracker observableTimestampTracker;
@@ -283,9 +243,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
/** Marshallers provider. */
private final ReflectionMarshallersProvider marshallers = new
ReflectionMarshallersProvider();
- /** Index chooser for full state transfer. */
- private final FullStateTransferIndexChooser fullStateTransferIndexChooser;
-
private final TransactionInflights transactionInflights;
private final String nodeName;
@@ -295,26 +252,14 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
@Nullable
private ScheduledExecutorService streamerFlushExecutor;
- private final MinimumRequiredTimeCollectorService minTimeCollectorService;
-
@Nullable
private StreamerReceiverRunner streamerReceiverRunner;
private final CompletableFuture<Void> readyToProcessReplicaStarts = new
CompletableFuture<>();
- // TODO https://issues.apache.org/jira/browse/IGNITE-25347
- /** Mapping zone identifier to a collection of tables related to the zone.
*/
- private final Map<Integer, Set<TableViewInternal>> tablesPerZone = new
HashMap<>();
- /** Locks to synchronize an access to the {@link #tablesPerZone}. */
- private final Map<Integer, NaiveAsyncReadWriteLock> tablesPerZoneLocks =
new ConcurrentHashMap<>();
-
/** Configuration of rebalance retries delay. */
private final SystemDistributedConfigurationPropertyHolder<Integer>
rebalanceRetryDelayConfiguration;
- private final EventListener<LocalBeforeReplicaStartEventParameters>
onBeforeZoneReplicaStartedListener = this::beforeZoneReplicaStarted;
- private final EventListener<LocalPartitionReplicaEventParameters>
onZoneReplicaStoppedListener = this::onZoneReplicaStopped;
- private final EventListener<LocalPartitionReplicaEventParameters>
onZoneReplicaDestroyedListener = this::onZoneReplicaDestroyed;
-
private final EventListener<CreateTableEventParameters>
onTableCreateListener = this::loadTableToZoneOnTableCreate;
private final EventListener<DropTableEventParameters> onTableDropListener
= fromConsumer(this::onTableDrop);
private final EventListener<CatalogEventParameters> onTableAlterListener =
this::onTableAlter;
@@ -323,11 +268,10 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
private final MetricManager metricManager;
- private final Map<TablePartitionId, PartitionTableStatsMetricSource>
partModCounterMetricSources = new ConcurrentHashMap<>();
- private final Map<TablePartitionId, LongSupplier>
pendingWriteIntentsSuppliers = new ConcurrentHashMap<>();
-
private final TablePartitionResourcesFactory partitionResourcesFactory;
+ private final TableZoneCoordinator zoneCoordinator;
+
/**
* Creates a new table manager.
*
@@ -406,7 +350,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
this.transactionInflights = transactionInflights;
this.nodeName = nodeName;
this.partitionReplicaLifecycleManager =
partitionReplicaLifecycleManager;
- this.minTimeCollectorService = minTimeCollectorService;
this.metricManager = metricManager;
this.executorInclinedSchemaSyncService = new
ExecutorInclinedSchemaSyncService(schemaSyncService,
partitionOperationsExecutor);
@@ -423,7 +366,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
scanRequestExecutor = Executors.newSingleThreadExecutor(
IgniteThreadFactory.create(nodeName, "scan-query-executor",
LOG, STORAGE_READ));
- mvGc = new MvGc(nodeName, gcConfig, lowWatermark, failureProcessor);
+ MvGc mvGc = new MvGc(nodeName, gcConfig, lowWatermark,
failureProcessor);
partitionReplicatorNodeRecovery = new PartitionReplicatorNodeRecovery(
messagingService,
@@ -431,7 +374,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
tableId -> tablesById().get(tableId)
);
- fullStateTransferIndexChooser = new
FullStateTransferIndexChooser(catalogService, lowWatermark, indexMetaStorage);
+ FullStateTransferIndexChooser fullStateTransferIndexChooser =
+ new FullStateTransferIndexChooser(catalogService,
lowWatermark, indexMetaStorage);
partitionResourcesFactory = new TablePartitionResourcesFactory(
txManager,
@@ -465,22 +409,32 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
Integer::parseInt
);
- // Register event listeners in the constructor to avoid races with
"partitionReplicaLifecycleManager"'s recovery.
- // We rely on the "readyToProcessReplicaStarts" future to block event
handling until "startAsync" is completed.
- partitionReplicaLifecycleManager.listen(BEFORE_REPLICA_STARTED,
onBeforeZoneReplicaStartedListener);
- partitionReplicaLifecycleManager.listen(AFTER_REPLICA_STOPPED,
onZoneReplicaStoppedListener);
- partitionReplicaLifecycleManager.listen(AFTER_REPLICA_DESTROYED,
onZoneReplicaDestroyedListener);
+ zoneCoordinator = new TableZoneCoordinator(
+ mvGc,
+ fullStateTransferIndexChooser,
+ partitionReplicaLifecycleManager,
+ partitionResourcesFactory,
+ minTimeCollectorService,
+ metricManager,
+ catalogService,
+ lowWatermark,
+ ioExecutor,
+ busyLock,
+ tableRegistry,
+ tablesVv,
+ localPartitionsVv,
+ assignmentsUpdatedVv,
+ readyToProcessReplicaStarts
+ );
}
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
return inBusyLockAsync(busyLock, () -> {
TransactionMetricsSource transactionMetricsSource =
txManager.transactionMetricsSource();
-
transactionMetricsSource.setPendingWriteIntentsSupplier(this::totalPendingWriteIntents);
-
- mvGc.start();
+
transactionMetricsSource.setPendingWriteIntentsSupplier(zoneCoordinator::totalPendingWriteIntents);
- fullStateTransferIndexChooser.start();
+ zoneCoordinator.start();
rebalanceRetryDelayConfiguration.init();
@@ -504,187 +458,10 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
});
}
- private CompletableFuture<Boolean>
beforeZoneReplicaStarted(LocalBeforeReplicaStartEventParameters parameters) {
- return inBusyLockAsync(busyLock, () -> readyToProcessReplicaStarts
- .thenCompose(v -> beforeZoneReplicaStartedImpl(parameters))
- .thenApply(unused -> false)
- );
- }
-
- private CompletableFuture<Void>
beforeZoneReplicaStartedImpl(LocalBeforeReplicaStartEventParameters parameters)
{
- return inBusyLockAsync(busyLock, () -> {
- ZonePartitionId zonePartitionId = parameters.zonePartitionId();
-
- NaiveAsyncReadWriteLock zoneLock =
tablesPerZoneLocks.computeIfAbsent(
- zonePartitionId.zoneId(),
- id -> new NaiveAsyncReadWriteLock());
-
- CompletableFuture<Long> readLockAcquisitionFuture =
zoneLock.readLock();
-
- try {
- return readLockAcquisitionFuture.thenCompose(stamp -> {
- Set<TableViewInternal> zoneTables =
zoneTablesRawSet(zonePartitionId.zoneId());
-
- return
createPartitionsAndLoadResourcesToZoneReplica(zonePartitionId, zoneTables,
parameters);
- }).whenComplete((unused, t) ->
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead));
- } catch (Throwable t) {
- readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
-
- return failedFuture(t);
- }
- });
- }
-
- private CompletableFuture<Void>
createPartitionsAndLoadResourcesToZoneReplica(
- ZonePartitionId zonePartitionId,
- Set<TableViewInternal> zoneTables,
- LocalBeforeReplicaStartEventParameters event
- ) {
- int partitionIndex = zonePartitionId.partitionId();
-
- PartitionSet singlePartitionIdSet = PartitionSet.of(partitionIndex);
-
- List<CompletableFuture<?>> storageCreationFutures = zoneTables.stream()
- .map(tbl -> inBusyLockAsync(busyLock, () -> {
- return createPartitionStoragesIfAbsent(tbl,
singlePartitionIdSet)
- // If the table is already closed, it's not a
problem (probably the node is stopping).
- .exceptionally(ignoreTableClosedException());
- }))
- .collect(toList());
-
- return CompletableFutures.allOf(storageCreationFutures)
- .thenRunAsync(() ->
scheduleMvPartitionsCleanupIfNeeded(zoneTables, partitionIndex, event),
ioExecutor)
- // If a table is already closed, it's not a problem (probably
the node is stopping).
- .exceptionally(ignoreTableClosedException())
- .thenCompose(unused -> {
- CompletableFuture<?>[] futures = zoneTables.stream()
- .map(tbl -> inBusyLockAsync(busyLock, () -> {
- return runAsync(() -> inBusyLock(busyLock, ()
-> {
- localPartsByTableId.compute(
- tbl.tableId(),
- (tableId, oldPartitionSet) ->
extendPartitionSet(oldPartitionSet, partitionIndex)
- );
-
- lowWatermark.getLowWatermarkSafe(lwm ->
- registerIndexesToTable(
- tbl,
- catalogService,
- singlePartitionIdSet,
- tbl.schemaView(),
- lwm
- )
- );
-
-
preparePartitionResourcesAndLoadToZoneReplicaBusy(
- tbl,
- zonePartitionId,
- event.resources(),
- event.onRecovery()
- );
- }), ioExecutor)
- // If the table is already closed, it's not a
problem (probably the node is stopping).
- .exceptionally(ignoreTableClosedException());
- }))
- .toArray(CompletableFuture[]::new);
-
- return allOf(futures);
- });
- }
-
- private static void scheduleMvPartitionsCleanupIfNeeded(
- Set<TableViewInternal> zoneTables,
- int partitionIndex,
- LocalBeforeReplicaStartEventParameters event
- ) {
- boolean anyMvPartitionStorageIsInRebalanceState = zoneTables.stream()
- .map(table ->
table.internalTable().storage().getMvPartition(partitionIndex))
- .filter(Objects::nonNull)
- .anyMatch(partitionStorage ->
partitionStorage.lastAppliedIndex() ==
MvPartitionStorage.REBALANCE_IN_PROGRESS);
-
- if (anyMvPartitionStorageIsInRebalanceState) {
- event.registerStorageInRebalanceState();
- }
-
- // Adding the cleanup action even if no MV partition storage is in
rebalance state as it might be that the TX state storage is.
- event.addCleanupAction(() -> {
- CompletableFuture<?>[] clearFutures = zoneTables.stream()
- .map(table ->
table.internalTable().storage().clearPartition(partitionIndex))
- .toArray(CompletableFuture[]::new);
- return allOf(clearFutures);
- });
- }
-
- private static <T> Function<Throwable, T> ignoreTableClosedException() {
- return ex -> {
- if (hasCause(ex, TableClosedException.class)) {
- return null;
- }
- throw sneakyThrow(ex);
- };
- }
-
- private CompletableFuture<Boolean>
onZoneReplicaStopped(LocalPartitionReplicaEventParameters parameters) {
- ZonePartitionId zonePartitionId = parameters.zonePartitionId();
-
- NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(
- zonePartitionId.zoneId(),
- id -> new NaiveAsyncReadWriteLock());
-
- CompletableFuture<Long> readLockAcquisitionFuture =
zoneLock.readLock();
-
- try {
- return readLockAcquisitionFuture.thenCompose(stamp ->
inBusyLockAsync(busyLock, () -> {
- CompletableFuture<?>[] futures =
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
- .map(this::stopTablePartitions)
- .toArray(CompletableFuture[]::new);
-
- return allOf(futures);
- })).whenComplete((v, t) ->
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead)).thenApply(v ->
false);
- } catch (Throwable t) {
- readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
-
- return failedFuture(t);
- }
- }
-
- private CompletableFuture<Boolean>
onZoneReplicaDestroyed(LocalPartitionReplicaEventParameters parameters) {
- ZonePartitionId zonePartitionId = parameters.zonePartitionId();
-
- NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(
- zonePartitionId.zoneId(),
- id -> new NaiveAsyncReadWriteLock());
-
- CompletableFuture<Long> readLockAcquisitionFuture =
zoneLock.readLock();
-
- try {
- return readLockAcquisitionFuture.thenCompose(stamp -> {
- return inBusyLockAsync(busyLock, () -> {
- CompletableFuture<?>[] futures =
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
- .map(table -> supplyAsync(
- () -> inBusyLockAsync(
- busyLock,
- () -> stopAndDestroyTablePartition(
- new
TablePartitionId(table.tableId(), zonePartitionId.partitionId()),
- parameters.causalityToken()
- )
- ),
- ioExecutor).thenCompose(identity()))
- .toArray(CompletableFuture[]::new);
-
- return allOf(futures);
- });
- }).whenComplete((v, t) ->
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead)).thenApply((unused)
-> false);
- } catch (Throwable t) {
- readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
-
- return failedFuture(t);
- }
- }
-
/**
* During node recovery pre-populates required internal table structures
before zone replicas are started.
*
- * <p>The created resources will then be loaded during replica startup in
{@link #beforeZoneReplicaStarted}.
+ * <p>The created resources will then be loaded during replica startup by
{@link TableZoneCoordinator}.
*/
private CompletableFuture<Void> prepareTableResourcesOnRecovery(
long causalityToken,
@@ -706,11 +483,11 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
schemaRegistry
);
- tables.put(tableId, table);
+ tableRegistry.tables().put(tableId, table);
- addTableToZone(zoneDescriptor.id(), table);
+ zoneCoordinator.addTableToZone(zoneDescriptor.id(),
table);
- startedTables.put(tableId, table);
+ tableRegistry.startedTables().put(tableId, table);
}));
});
}
@@ -741,7 +518,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
return schemaManager.schemaRegistry(causalityToken,
tableId).thenAccept(schemaRegistry -> {
TableImpl table = createTableImpl(causalityToken,
tableDescriptor, zoneDescriptor, schemaDescriptor, schemaRegistry);
- tables.put(tableId, table);
+ tableRegistry.tables().put(tableId, table);
});
}));
@@ -749,7 +526,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
// will call update() on VVs and inside those updates it will chain on
the lock acquisition future.
CompletableFuture<Long> acquisitionFuture =
partitionReplicaLifecycleManager.lockZoneForRead(zoneDescriptor.id());
try {
- return
loadTableToZoneOnTableCreateHavingZoneReadLock(acquisitionFuture,
causalityToken, zoneDescriptor, tableId)
+ return zoneCoordinator.loadTableToZone(acquisitionFuture,
causalityToken, zoneDescriptor, tableId)
.whenComplete((res, ex) ->
unlockZoneForRead(zoneDescriptor, acquisitionFuture));
} catch (Throwable e) {
unlockZoneForRead(zoneDescriptor, acquisitionFuture);
@@ -758,151 +535,15 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
}
- private CompletableFuture<Void>
loadTableToZoneOnTableCreateHavingZoneReadLock(
- CompletableFuture<Long> readLockAcquisitionFuture,
- long causalityToken,
- CatalogZoneDescriptor zoneDescriptor,
- int tableId
- ) {
- CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
- CompletableFuture<Void> readLockFutureEx =
allOf(readLockAcquisitionFuture, tablesByIdFuture);
-
- // NB: all vv.update() calls must be made from the synchronous part of
the method (not in thenCompose()/etc!).
- CompletableFuture<?> localPartsUpdateFuture =
localPartitionsVv.update(causalityToken,
- (ignore, throwable) -> inBusyLock(busyLock, () ->
readLockFutureEx.thenComposeAsync(unused -> {
- PartitionSet parts = new BitSetPartitionSet();
-
- for (int i = 0; i < zoneDescriptor.partitions(); i++) {
- if
(partitionReplicaLifecycleManager.hasLocalPartition(new
ZonePartitionId(zoneDescriptor.id(), i))) {
- parts.set(i);
- }
- }
-
- var table = (TableImpl) tables.get(tableId);
-
- return createPartitionStoragesIfAbsent(table,
parts).thenRun(() -> localPartsByTableId.put(tableId, parts));
- }, ioExecutor))
- // If the table is already closed, it's not a problem
(probably the node is stopping).
- .exceptionally(ignoreTableClosedException())
- );
-
- CompletableFuture<?> createPartsFut =
assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
- if (e != null) {
- return failedFuture(e);
- }
-
- return localPartsUpdateFuture.thenRunAsync(() ->
inBusyLock(busyLock, () -> {
- var table = (TableImpl) tables.get(tableId);
-
- for (int i = 0; i < zoneDescriptor.partitions(); i++) {
- var zonePartitionId = new
ZonePartitionId(zoneDescriptor.id(), i);
-
- if
(partitionReplicaLifecycleManager.hasLocalPartition(zonePartitionId)) {
- preparePartitionResourcesAndLoadToZoneReplicaBusy(
- table,
- zonePartitionId,
-
partitionReplicaLifecycleManager.zonePartitionResources(zonePartitionId),
- false
- );
- }
- }
- }), ioExecutor);
- });
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible
performance degradation.
- return createPartsFut.thenAccept(ignore -> {
- var table = (TableImpl) tables.get(tableId);
-
- startedTables.put(tableId, table);
-
- addTableToZone(zoneDescriptor.id(), table);
- });
- }
-
private void unlockZoneForRead(CatalogZoneDescriptor zoneDescriptor,
CompletableFuture<Long> readLockAcquiryFuture) {
readLockAcquiryFuture.thenAccept(stamp -> {
partitionReplicaLifecycleManager.unlockZoneForRead(zoneDescriptor.id(), stamp);
});
}
- /**
- * Prepare the table partition resources and load it to the zone-based
replica.
- *
- * @param table Table.
- * @param zonePartitionId Zone Partition ID.
- * @param resources Replica resources.
- */
- private void preparePartitionResourcesAndLoadToZoneReplicaBusy(
- TableViewInternal table,
- ZonePartitionId zonePartitionId,
- ZonePartitionResources resources,
- boolean onNodeRecovery
- ) {
- int partId = zonePartitionId.partitionId();
-
- int tableId = table.tableId();
-
- var tablePartitionId = new TablePartitionId(tableId, partId);
-
- MvPartitionStorage mvPartitionStorage;
- try {
- mvPartitionStorage = getMvPartitionStorage(table, partId);
- } catch (TableClosedException e) {
- // The node is probably stopping while we start the table, let's
just skip it.
- return;
- }
-
- PartitionDataStorage partitionDataStorage =
partitionResourcesFactory.createPartitionDataStorage(
- new PartitionKey(zonePartitionId.zoneId(), partId),
- tableId,
- mvPartitionStorage
- );
-
- PartitionResources partitionResources =
partitionResourcesFactory.createPartitionResources(
- partId,
- partitionDataStorage,
- table,
- resources.safeTimeTracker()
- );
-
- partitionResources.storageUpdateHandler.start(onNodeRecovery);
-
- registerPartitionTableStatsMetrics(table, partId, partitionResources);
-
- mvGc.addStorage(tablePartitionId, partitionResources.gcUpdateHandler);
-
- minTimeCollectorService.addPartition(new TablePartitionId(tableId,
partId));
-
- TablePartitionReplicaProcessorFactory createListener = (raftClient,
transactionStateResolver) ->
- partitionResourcesFactory.createReplicaListener(
- zonePartitionId,
- table,
- resources.safeTimeTracker(),
- mvPartitionStorage,
- partitionResources,
- raftClient,
- transactionStateResolver
- );
-
- TablePartitionProcessor tablePartitionProcessor =
partitionResourcesFactory.createTablePartitionProcessor(
- zonePartitionId, table, partitionDataStorage,
partitionResources);
-
- PartitionMvStorageAccess partitionStorageAccess =
partitionResourcesFactory.createPartitionMvStorageAccess(
- partId, table, partitionResources);
-
- partitionReplicaLifecycleManager.loadTableListenerToZoneReplica(
- zonePartitionId,
- tableId,
- createListener,
- tablePartitionProcessor,
- partitionStorageAccess,
- onNodeRecovery
- );
- }
-
private void onTableDrop(DropTableEventParameters parameters) {
inBusyLock(busyLock, () -> {
- unregisterMetricsSource(startedTables.get(parameters.tableId()));
+
unregisterMetricsSource(tableRegistry.startedTables().get(parameters.tableId()));
destructionEventsQueue.enqueue(new
DestroyTableEvent(parameters.catalogVersion(), parameters.tableId()));
});
@@ -947,7 +588,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
return failedFuture(e);
}
- TableViewInternal table = tables.get(parameters.tableId());
+ TableViewInternal table =
tableRegistry.tables().get(parameters.tableId());
table.updateStalenessConfiguration(parameters.staleRowsFraction(),
parameters.minStaleRowsCount());
@@ -964,7 +605,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
return failedFuture(e);
}
- TableViewInternal table = tables.get(parameters.tableId());
+ TableViewInternal table =
tableRegistry.tables().get(parameters.tableId());
// TODO: revisit this approach, see
https://issues.apache.org/jira/browse/IGNITE-21235.
((TableImpl) table).name(parameters.newTableName());
@@ -999,22 +640,18 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
if (!stopGuard.compareAndSet(false, true)) {
return nullCompletedFuture();
}
+
TransactionMetricsSource transactionMetricsSource =
txManager.transactionMetricsSource();
if (transactionMetricsSource != null) {
transactionMetricsSource.setPendingWriteIntentsSupplier(null);
}
-
partitionReplicaLifecycleManager.removeListener(AFTER_REPLICA_DESTROYED,
onZoneReplicaDestroyedListener);
- partitionReplicaLifecycleManager.removeListener(AFTER_REPLICA_STOPPED,
onZoneReplicaStoppedListener);
-
partitionReplicaLifecycleManager.removeListener(BEFORE_REPLICA_STARTED,
onBeforeZoneReplicaStartedListener);
-
int shutdownTimeoutSeconds = 10;
try {
closeAllManually(
- () -> closeAllManually(tables.values().stream().map(table
-> () -> closeTable(table))),
- mvGc,
- fullStateTransferIndexChooser,
+ zoneCoordinator::stop,
+ () ->
closeAllManually(tableRegistry.tables().values().stream().map(table -> () ->
closeTable(table))),
() -> shutdownAndAwaitTermination(scanRequestExecutor,
shutdownTimeoutSeconds, TimeUnit.SECONDS),
() -> {
ScheduledExecutorService streamerFlushExecutor;
@@ -1033,22 +670,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return nullCompletedFuture();
}
- private CompletableFuture<Void> stopTablePartitions(TableViewInternal
table) {
- return supplyAsync(() -> {
- InternalTable internalTable = table.internalTable();
-
- var stopReplicaFutures = new
CompletableFuture<?>[internalTable.partitions()];
-
- for (int p = 0; p < internalTable.partitions(); p++) {
- TablePartitionId replicationGroupId = new
TablePartitionId(table.tableId(), p);
-
- stopReplicaFutures[p] = stopTablePartition(replicationGroupId,
table);
- }
-
- return allOf(stopReplicaFutures);
- }, ioExecutor).thenCompose(identity());
- }
-
private static void closeTable(TableViewInternal table) throws Exception {
InternalTable internalTable = table.internalTable();
@@ -1142,18 +763,18 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
* @param tableId Table id to destroy.
*/
private CompletableFuture<Void> destroyTableLocally(int tableId) {
- TableViewInternal table = startedTables.remove(tableId);
+ TableViewInternal table =
tableRegistry.startedTables().remove(tableId);
- localPartsByTableId.remove(tableId);
+ tableRegistry.localPartsByTableId().remove(tableId);
assert table != null : tableId;
InternalTable internalTable = table.internalTable();
- return stopAndDestroyTableProcessors(table)
+ return zoneCoordinator.stopAndDestroyTableProcessors(table)
.thenComposeAsync(unused -> inBusyLockAsync(busyLock, () ->
internalTable.storage().destroy()), ioExecutor)
.thenAccept(unused -> inBusyLock(busyLock, () -> {
- tables.remove(tableId);
+ tableRegistry.tables().remove(tableId);
schemaManager.dropRegistry(tableId);
}))
.whenComplete((v, e) -> {
@@ -1205,14 +826,14 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
* @see #assignmentsUpdatedVv
*/
private CompletableFuture<Map<Integer, TableViewInternal>> tablesById(long
causalityToken) {
- return assignmentsUpdatedVv.get(causalityToken).thenApply(v ->
unmodifiableMap(startedTables));
+ return assignmentsUpdatedVv.get(causalityToken).thenApply(v ->
unmodifiableMap(tableRegistry.startedTables()));
}
/**
* Returns an internal map, which contains all managed tables by their ID.
*/
private Map<Integer, TableViewInternal> tablesById() {
- return unmodifiableMap(tables);
+ return unmodifiableMap(tableRegistry.tables());
}
/**
@@ -1220,7 +841,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
*/
@TestOnly
public Map<Integer, TableViewInternal> startedTables() {
- return unmodifiableMap(startedTables);
+ return unmodifiableMap(tableRegistry.startedTables());
}
@Override
@@ -1282,7 +903,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
try {
return localPartitionsVv.get(causalityToken)
- .thenApply(unused ->
localPartsByTableId.getOrDefault(tableId, PartitionSet.EMPTY_SET));
+ .thenApply(unused ->
tableRegistry.localPartsByTableId().getOrDefault(tableId,
PartitionSet.EMPTY_SET));
} finally {
busyLock.leaveBusy();
}
@@ -1324,7 +945,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
}
private CompletableFuture<TableViewInternal> tableAsyncInternalBusy(int
tableId) {
- TableViewInternal tableImpl = startedTables.get(tableId);
+ TableViewInternal tableImpl =
tableRegistry.startedTables().get(tableId);
if (tableImpl != null) {
return completedFuture(tableImpl);
@@ -1336,11 +957,11 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
if (th == null) {
CompletableFuture<?> tablesFuture = tablesVv.get(token);
- tablesFuture.whenComplete((tables, e) -> {
+ tablesFuture.whenComplete((tbls, e) -> {
if (e != null) {
getLatestTableFuture.completeExceptionally(e);
} else {
-
getLatestTableFuture.complete(startedTables.get(tableId));
+
getLatestTableFuture.complete(tableRegistry.startedTables().get(tableId));
}
});
} else {
@@ -1354,7 +975,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
// This check is needed for the case when we have registered
tablesListener,
// but tablesVv has already been completed, so listener would be
triggered only for the next versioned value update.
- tableImpl = startedTables.get(tableId);
+ tableImpl = tableRegistry.startedTables().get(tableId);
if (tableImpl != null) {
assignmentsUpdatedVv.removeWhenComplete(tablesListener);
@@ -1370,209 +991,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return IgniteUtils.getInterruptibly(future);
}
- private static PartitionSet extendPartitionSet(@Nullable PartitionSet
oldPartitionSet, int partitionId) {
- PartitionSet newPartitionSet =
Objects.requireNonNullElseGet(oldPartitionSet, BitSetPartitionSet::new);
- newPartitionSet.set(partitionId);
- return newPartitionSet;
- }
-
- /**
- * Gets MV partition storage.
- *
- * @param table Table.
- * @param partitionId Partition ID.
- * @return MvPartitionStorage.
- */
- private static MvPartitionStorage getMvPartitionStorage(TableViewInternal
table, int partitionId) {
- InternalTable internalTable = table.internalTable();
-
- MvPartitionStorage mvPartition;
- try {
- mvPartition = internalTable.storage().getMvPartition(partitionId);
- } catch (StorageClosedException e) {
- throw new TableClosedException(table.tableId(), e);
- }
-
- assert mvPartition != null : "tableId=" + table.tableId() + ",
partitionId=" + partitionId;
-
- return mvPartition;
- }
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19739 Create
storages only once.
- private CompletableFuture<Void>
createPartitionStoragesIfAbsent(TableViewInternal table, PartitionSet
partitions) {
- InternalTable internalTable = table.internalTable();
-
- List<CompletableFuture<MvPartitionStorage>> storageFuts =
partitions.stream().mapToObj(partitionId -> {
- MvPartitionStorage mvPartition;
- try {
- mvPartition =
internalTable.storage().getMvPartition(partitionId);
- } catch (StorageClosedException e) {
- return CompletableFuture.<MvPartitionStorage>failedFuture(new
TableClosedException(table.tableId(), e));
- }
-
- return mvPartition != null ? completedFuture(mvPartition) :
internalTable.storage().createMvPartition(partitionId);
- }).collect(toList());
-
- return CompletableFutures.allOf(storageFuts);
- }
-
- private CompletableFuture<Void>
stopAndDestroyTableProcessors(TableViewInternal table) {
- InternalTable internalTable = table.internalTable();
-
- int partitions = internalTable.partitions();
-
- NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(
- internalTable.zoneId(),
- id -> new NaiveAsyncReadWriteLock());
-
- CompletableFuture<Long> writeLockAcquisitionFuture =
zoneLock.writeLock();
-
- try {
- return writeLockAcquisitionFuture.thenCompose(stamp -> {
- CompletableFuture<?>[] stopReplicaAndDestroyFutures = new
CompletableFuture<?>[partitions];
-
- for (int partitionId = 0; partitionId < partitions;
partitionId++) {
- CompletableFuture<Void> resourcesUnloadFuture;
-
- resourcesUnloadFuture =
partitionReplicaLifecycleManager.unloadTableResourcesFromZoneReplica(
- new ZonePartitionId(internalTable.zoneId(),
partitionId),
- internalTable.tableId()
- );
-
- var tablePartitionId = new
TablePartitionId(internalTable.tableId(), partitionId);
-
- stopReplicaAndDestroyFutures[partitionId] =
resourcesUnloadFuture
- .thenCompose(v ->
stopAndDestroyTablePartition(tablePartitionId, table));
- }
-
- return allOf(stopReplicaAndDestroyFutures).whenComplete((res,
th) -> {
- tablesPerZone.getOrDefault(internalTable.zoneId(),
emptySet()).remove(table);
- });
- }).whenComplete((unused, t) ->
writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite));
- } catch (Throwable t) {
- writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite);
-
- throw t;
- }
- }
-
- private CompletableFuture<Void>
stopAndDestroyTablePartition(TablePartitionId tablePartitionId, long
causalityToken) {
- CompletableFuture<?> tokenFuture;
-
- try {
- tokenFuture = tablesVv.get(causalityToken);
- } catch (OutdatedTokenException e) {
- // Here we need only to ensure that the token has been seen.
- // TODO https://issues.apache.org/jira/browse/IGNITE-25742
- tokenFuture = nullCompletedFuture();
- }
-
- return tokenFuture
- .thenCompose(ignore -> {
- TableViewInternal table =
tables.get(tablePartitionId.tableId());
- assert table != null : tablePartitionId;
-
- return stopAndDestroyTablePartition(tablePartitionId,
table);
- });
- }
-
- private CompletableFuture<Void> stopAndDestroyTablePartition(
- TablePartitionId tablePartitionId,
- TableViewInternal table
- ) {
- return stopTablePartition(tablePartitionId, table)
- .thenComposeAsync(v ->
destroyPartitionStorages(tablePartitionId, table), ioExecutor);
- }
-
- /**
- * Stops all resources associated with a given partition, like replicas
and partition trackers.
- *
- * @param tablePartitionId Partition ID.
- * @param table Table which this partition belongs to.
- * @return Future that will be completed after all resources have been
closed.
- */
- private CompletableFuture<Void> stopTablePartition(TablePartitionId
tablePartitionId, TableViewInternal table) {
- // In case of colocation there shouldn't be any table replica and thus
it shouldn't be stopped.
- minTimeCollectorService.removePartition(tablePartitionId);
-
- unregisterPartitionMetrics(tablePartitionId, table.name());
-
- return mvGc.removeStorage(tablePartitionId);
- }
-
- private void registerPartitionTableStatsMetrics(
- TableViewInternal table,
- int partitionId,
- PartitionResources partitionResources
- ) {
- PartitionTableStatsMetricSource metricSource =
- new PartitionTableStatsMetricSource(table.tableId(),
partitionId, partitionResources.modificationCounter);
-
- try {
- // Only register this Metrics Source and do not enable it by
default
- // as it is intended for online troubleshooting purposes only (see
IGNITE-27813).
- metricManager.registerSource(metricSource);
-
- TablePartitionId tablePartitionId = new
TablePartitionId(table.tableId(), partitionId);
-
- partModCounterMetricSources.put(tablePartitionId, metricSource);
- } catch (Exception e) {
- LOG.warn("Failed to register metrics source for table [name={},
partitionId={}].", e, table.name(), partitionId);
- }
-
- pendingWriteIntentsSuppliers.put(
- new TablePartitionId(table.tableId(), partitionId),
- partitionResources.storageUpdateHandler::getPendingRowCount
- );
- }
-
- private void unregisterPartitionMetrics(TablePartitionId tablePartitionId,
String tableName) {
- PartitionTableStatsMetricSource metricSource =
partModCounterMetricSources.remove(tablePartitionId);
- pendingWriteIntentsSuppliers.remove(tablePartitionId);
- if (metricSource != null) {
- try {
- metricManager.unregisterSource(metricSource);
- } catch (Exception e) {
- String message = "Failed to unregister metrics source for
table [name={}, partitionId={}].";
- LOG.warn(message, e, tableName,
tablePartitionId.partitionId());
- }
- }
- }
-
- private long totalPendingWriteIntents() {
- long sum = 0;
-
- for (LongSupplier supplier : pendingWriteIntentsSuppliers.values()) {
- sum += supplier.getAsLong();
- }
-
- return sum;
- }
-
- private CompletableFuture<Void> destroyPartitionStorages(
- TablePartitionId tablePartitionId,
- TableViewInternal table
- ) {
- InternalTable internalTable = table.internalTable();
-
- int partitionId = tablePartitionId.partitionId();
-
- List<CompletableFuture<?>> destroyFutures = new ArrayList<>();
-
- try {
- if (internalTable.storage().getMvPartition(partitionId) != null) {
-
destroyFutures.add(internalTable.storage().destroyPartition(partitionId));
- }
- } catch (StorageDestroyedException ignored) {
- // Ignore as the storage is already destroyed, no need to destroy
it again.
- } catch (StorageClosedException ignored) {
- // The storage is closed, so the node is being stopped. We'll
destroy the partition on node recovery.
- }
-
- // TODO: IGNITE-24926 - reduce set in localPartsByTableId after
storages destruction.
- return allOf(destroyFutures.toArray(new CompletableFuture[]{}));
- }
-
/**
* Returns a cached table instance if it exists, {@code null} otherwise.
Can return a table that is being stopped.
*
@@ -1580,7 +998,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
*/
@Override
public @Nullable TableViewInternal cachedTable(int tableId) {
- return tables.get(tableId);
+ return tableRegistry.tables().get(tableId);
}
/**
@@ -1590,7 +1008,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
*/
@TestOnly
public @Nullable TableViewInternal cachedTable(String name) {
- return findTableImplByName(tables.values(), name);
+ return findTableImplByName(tableRegistry.tables().values(), name);
}
private CatalogZoneDescriptor getZoneDescriptor(CatalogTableDescriptor
tableDescriptor, int catalogVersion) {
@@ -1764,75 +1182,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
* @throws IgniteInternalException If failed to acquire a read lock for
the zone or current thread was interrupted while waiting.
*/
public Set<TableViewInternal> zoneTables(int zoneId) throws
IgniteInternalException {
- NaiveAsyncReadWriteLock zoneLock =
tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock());
-
- CompletableFuture<Long> readLockAcquisitionFuture =
zoneLock.readLock();
-
- try {
- return readLockAcquisitionFuture.thenApply(stamp -> {
- Set<TableViewInternal> res =
Set.copyOf(zoneTablesRawSet(zoneId));
-
- zoneLock.unlockRead(stamp);
-
- return res;
- }).get();
- } catch (Throwable t) {
- readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
-
- if (t instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
-
- throw new IgniteInternalException(INTERNAL_ERR, "Failed to acquire
a read lock for zone [zoneId=" + zoneId + ']', t);
- }
- }
-
- /**
- * Returns a set of tables that belong to the specified zone.
- * Note that this method call should be properly synchronized using {@link
#tablesPerZoneLocks}.
- *
- * @param zoneId Zone identifier.
- * @return Set of tables.
- */
- private Set<TableViewInternal> zoneTablesRawSet(int zoneId) {
- return tablesPerZone.getOrDefault(zoneId, Set.of());
- }
-
- /**
- * Adds a table to the specified zone.
- *
- * @param zoneId Zone identifier.
- * @param table Table to add.
- * @throws IgniteInternalException If failed to acquire a write lock for
the zone or current thread was interrupted while waiting.
- */
- private void addTableToZone(int zoneId, TableImpl table) throws
IgniteInternalException {
- NaiveAsyncReadWriteLock zoneLock =
tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock());
-
- CompletableFuture<Long> writeLockAcquisitionFuture =
zoneLock.writeLock();
-
- try {
- writeLockAcquisitionFuture.thenAccept(stamp -> {
- tablesPerZone.compute(zoneId, (id, tables) -> {
- if (tables == null) {
- tables = new HashSet<>();
- }
-
- tables.add(table);
-
- return tables;
- });
-
- zoneLock.unlockWrite(stamp);
- }).get();
- } catch (Throwable t) {
- writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite);
-
- if (t instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
-
- throw new IgniteInternalException(INTERNAL_ERR, "Failed to acquire
a write lock for zone [zoneId=" + zoneId + ']', t);
- }
+ return zoneCoordinator.zoneTables(zoneId);
}
private ReadWriteMetricSource
createAndRegisterMetricsSource(MvTableStorage tableStorage, QualifiedName
tableName) {
@@ -1901,11 +1251,4 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
}
- private static class TableClosedException extends IgniteInternalException {
- private static final long serialVersionUID = 1L;
-
- private TableClosedException(int tableId, @Nullable Throwable cause) {
- super(INTERNAL_ERR, "Table is closed [tableId=" + tableId + "]",
cause);
- }
- }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableRegistry.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableRegistry.java
new file mode 100644
index 00000000000..5194a951678
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableRegistry.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.table.TableViewInternal;
+
+/**
+ * Holds shared mutable state for table tracking, shared between {@link
TableManager} and {@link TableZoneCoordinator}.
+ *
+ * <ul>
+ * <li>{@link #tables} — all registered tables by ID.</li>
+ * <li>{@link #startedTables} — tables that are fully started (partition
resources prepared).</li>
+ * <li>{@link #localPartsByTableId} — local partition sets per table.</li>
+ * </ul>
+ */
+class TableRegistry {
+ /** All registered tables by ID. */
+ private final Map<Integer, TableViewInternal> tables = new
ConcurrentHashMap<>();
+
+ /** Tables that are fully started (partition resources prepared). */
+ private final Map<Integer, TableViewInternal> startedTables = new
ConcurrentHashMap<>();
+
+ /** Local partitions by table ID. */
+ private final Map<Integer, PartitionSet> localPartsByTableId = new
ConcurrentHashMap<>();
+
+ Map<Integer, TableViewInternal> tables() {
+ return tables;
+ }
+
+ Map<Integer, TableViewInternal> startedTables() {
+ return startedTables;
+ }
+
+ Map<Integer, PartitionSet> localPartsByTableId() {
+ return localPartsByTableId;
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java
new file mode 100644
index 00000000000..9a2822d9be6
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java
@@ -0,0 +1,818 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED;
+import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED;
+import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED;
+import static
org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexesToTable;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.causality.IncrementalVersionedValue;
+import org.apache.ignite.internal.causality.OutdatedTokenException;
+import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
+import org.apache.ignite.internal.metrics.MetricManager;
+import
org.apache.ignite.internal.partition.replicator.LocalBeforeReplicaStartEventParameters;
+import
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEventParameters;
+import org.apache.ignite.internal.partition.replicator.NaiveAsyncReadWriteLock;
+import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
+import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager.TablePartitionReplicaProcessorFactory;
+import
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageDestroyedException;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
+import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
+import
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Coordinates table partition lifecycle within distribution zones.
+ *
+ * <p>Ensures that when zone replicas start/stop/destroy, the correct table
partition resources
+ * are created, loaded, stopped, and destroyed. Maintains the zone-to-table
mapping that enables this.
+ */
+class TableZoneCoordinator {
+ private static final IgniteLogger LOG =
Loggers.forClass(TableZoneCoordinator.class);
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-25347 Clean up
entries on zone drop to avoid memory leak.
+ /** Mapping zone identifier to a collection of tables related to the zone.
*/
+ private final Map<Integer, Set<TableViewInternal>> tablesPerZone = new
ConcurrentHashMap<>();
+
+ /** Locks to synchronize access to the {@link #tablesPerZone}. */
+ private final Map<Integer, NaiveAsyncReadWriteLock> tablesPerZoneLocks =
new ConcurrentHashMap<>();
+
+ private final Map<TablePartitionId, PartitionTableStatsMetricSource>
partModCounterMetricSources = new ConcurrentHashMap<>();
+ private final Map<TablePartitionId, LongSupplier>
pendingWriteIntentsSuppliers = new ConcurrentHashMap<>();
+
+ private final MvGc mvGc;
+ private final FullStateTransferIndexChooser fullStateTransferIndexChooser;
+
+ private final PartitionReplicaLifecycleManager
partitionReplicaLifecycleManager;
+ private final TablePartitionResourcesFactory partitionResourcesFactory;
+ private final MinimumRequiredTimeCollectorService minTimeCollectorService;
+ private final MetricManager metricManager;
+ private final CatalogService catalogService;
+ private final LowWatermark lowWatermark;
+ private final ExecutorService ioExecutor;
+ private final IgniteSpinBusyLock busyLock;
+
+ private final TableRegistry tableRegistry;
+ private final IncrementalVersionedValue<Void> tablesVv;
+ private final IncrementalVersionedValue<Void> localPartitionsVv;
+ private final IncrementalVersionedValue<Void> assignmentsUpdatedVv;
+ private final CompletableFuture<Void> readyToProcessReplicaStarts;
+
+ private final EventListener<LocalBeforeReplicaStartEventParameters>
onBeforeZoneReplicaStartedListener =
+ this::beforeZoneReplicaStarted;
+ private final EventListener<LocalPartitionReplicaEventParameters>
onZoneReplicaStoppedListener =
+ this::onZoneReplicaStopped;
+ private final EventListener<LocalPartitionReplicaEventParameters>
onZoneReplicaDestroyedListener =
+ this::onZoneReplicaDestroyed;
+
+ /**
+ * Constructor.
+ *
+ * @param mvGc MV garbage collector.
+ * @param fullStateTransferIndexChooser Index chooser for full state
transfer.
+ * @param partitionReplicaLifecycleManager Partition replica lifecycle
manager.
+ * @param partitionResourcesFactory Factory for creating table partition
resources.
+ * @param minTimeCollectorService Minimum required time collector service.
+ * @param metricManager Metric manager.
+ * @param catalogService Catalog service.
+ * @param lowWatermark Low watermark.
+ * @param ioExecutor Executor for IO operations.
+ * @param busyLock Busy lock shared with TableManager.
+ * @param tableRegistry Shared table registry.
+ * @param tablesVv Versioned value for linearizing table changing events.
+ * @param localPartitionsVv Versioned value for linearizing table
partitions changing events.
+ * @param assignmentsUpdatedVv Versioned value for tracking assignments
updates.
+ * @param readyToProcessReplicaStarts Future that completes after table
recovery.
+ */
+ TableZoneCoordinator(
+ MvGc mvGc,
+ FullStateTransferIndexChooser fullStateTransferIndexChooser,
+ PartitionReplicaLifecycleManager partitionReplicaLifecycleManager,
+ TablePartitionResourcesFactory partitionResourcesFactory,
+ MinimumRequiredTimeCollectorService minTimeCollectorService,
+ MetricManager metricManager,
+ CatalogService catalogService,
+ LowWatermark lowWatermark,
+ ExecutorService ioExecutor,
+ IgniteSpinBusyLock busyLock,
+ TableRegistry tableRegistry,
+ IncrementalVersionedValue<Void> tablesVv,
+ IncrementalVersionedValue<Void> localPartitionsVv,
+ IncrementalVersionedValue<Void> assignmentsUpdatedVv,
+ CompletableFuture<Void> readyToProcessReplicaStarts
+ ) {
+ this.mvGc = mvGc;
+ this.fullStateTransferIndexChooser = fullStateTransferIndexChooser;
+ this.partitionReplicaLifecycleManager =
partitionReplicaLifecycleManager;
+ this.partitionResourcesFactory = partitionResourcesFactory;
+ this.minTimeCollectorService = minTimeCollectorService;
+ this.metricManager = metricManager;
+ this.catalogService = catalogService;
+ this.lowWatermark = lowWatermark;
+ this.ioExecutor = ioExecutor;
+ this.busyLock = busyLock;
+ this.tableRegistry = tableRegistry;
+ this.tablesVv = tablesVv;
+ this.localPartitionsVv = localPartitionsVv;
+ this.assignmentsUpdatedVv = assignmentsUpdatedVv;
+ this.readyToProcessReplicaStarts = readyToProcessReplicaStarts;
+
+ // Register event listeners in the constructor to avoid races with
"partitionReplicaLifecycleManager"'s recovery.
+ // We rely on the "readyToProcessReplicaStarts" future to block event
handling until table recovery is completed.
+ partitionReplicaLifecycleManager.listen(BEFORE_REPLICA_STARTED,
onBeforeZoneReplicaStartedListener);
+ partitionReplicaLifecycleManager.listen(AFTER_REPLICA_STOPPED,
onZoneReplicaStoppedListener);
+ partitionReplicaLifecycleManager.listen(AFTER_REPLICA_DESTROYED,
onZoneReplicaDestroyedListener);
+ }
+
+ /**
+ * Starts the coordinator: starts MvGc and FullStateTransferIndexChooser.
+ */
+ void start() {
+ mvGc.start();
+ fullStateTransferIndexChooser.start();
+ }
+
+ /**
+ * Stops the coordinator: removes replica listeners, closes MvGc and
FullStateTransferIndexChooser.
+ *
+ * @throws Exception If an error occurs during close.
+ */
+ void stop() throws Exception {
+
partitionReplicaLifecycleManager.removeListener(AFTER_REPLICA_DESTROYED,
onZoneReplicaDestroyedListener);
+ partitionReplicaLifecycleManager.removeListener(AFTER_REPLICA_STOPPED,
onZoneReplicaStoppedListener);
+
partitionReplicaLifecycleManager.removeListener(BEFORE_REPLICA_STARTED,
onBeforeZoneReplicaStartedListener);
+
+ closeAllManually(mvGc, fullStateTransferIndexChooser);
+ }
+
+ /**
+ * Returns total pending write intents count across all partitions.
+ */
+ long totalPendingWriteIntents() {
+ long sum = 0;
+
+ for (LongSupplier supplier : pendingWriteIntentsSuppliers.values()) {
+ sum += supplier.getAsLong();
+ }
+
+ return sum;
+ }
+
+ /**
+ * Returns a copy of tables that belong to the specified zone.
+ *
+ * @param zoneId Zone identifier.
+ * @return Set of tables.
+ * @throws IgniteInternalException If failed to acquire a read lock for
the zone or current thread was interrupted while waiting.
+ */
+ Set<TableViewInternal> zoneTables(int zoneId) throws
IgniteInternalException {
+ NaiveAsyncReadWriteLock zoneLock =
tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock());
+
+ CompletableFuture<Long> readLockAcquisitionFuture =
zoneLock.readLock();
+
+ try {
+ return readLockAcquisitionFuture.thenApply(stamp -> {
+ Set<TableViewInternal> res =
Set.copyOf(zoneTablesRawSet(zoneId));
+
+ zoneLock.unlockRead(stamp);
+
+ return res;
+ }).get();
+ } catch (Throwable t) {
+ readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
+
+ if (t instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+
+ throw new IgniteInternalException(INTERNAL_ERR, "Failed to acquire
a read lock for zone [zoneId=" + zoneId + ']', t);
+ }
+ }
+
+ /**
+ * Adds a table to the specified zone.
+ *
+ * @param zoneId Zone identifier.
+ * @param table Table to add.
+ * @throws IgniteInternalException If failed to acquire a write lock for
the zone or current thread was interrupted while waiting.
+ */
+ void addTableToZone(int zoneId, TableImpl table) throws
IgniteInternalException {
+ NaiveAsyncReadWriteLock zoneLock =
tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock());
+
+ CompletableFuture<Long> writeLockAcquisitionFuture =
zoneLock.writeLock();
+
+ try {
+ writeLockAcquisitionFuture.thenAccept(stamp -> {
+ tablesPerZone.compute(zoneId, (id, zoneTbls) -> {
+ if (zoneTbls == null) {
+ zoneTbls = new HashSet<>();
+ }
+
+ zoneTbls.add(table);
+
+ return zoneTbls;
+ });
+
+ zoneLock.unlockWrite(stamp);
+ }).get();
+ } catch (Throwable t) {
+ writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite);
+
+ if (t instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+
+ throw new IgniteInternalException(INTERNAL_ERR, "Failed to acquire
a write lock for zone [zoneId=" + zoneId + ']', t);
+ }
+ }
+
+ /**
+ * Loads a newly created table into the zone by creating partition
storages and preparing partition resources for local zone replicas.
+ *
+ * @param readLockAcquisitionFuture Future for read lock on the zone in
PartitionReplicaLifecycleManager.
+ * @param causalityToken Causality token.
+ * @param zoneDescriptor Zone descriptor.
+ * @param tableId Table ID.
+ * @return Completion future.
+ */
+ CompletableFuture<Void> loadTableToZone(
+ CompletableFuture<Long> readLockAcquisitionFuture,
+ long causalityToken,
+ CatalogZoneDescriptor zoneDescriptor,
+ int tableId
+ ) {
+ CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
+ CompletableFuture<Void> readLockFutureEx =
allOf(readLockAcquisitionFuture, tablesByIdFuture);
+
+ // NB: all vv.update() calls must be made from the synchronous part of
the method (not in thenCompose()/etc!).
+ CompletableFuture<?> localPartsUpdateFuture =
localPartitionsVv.update(causalityToken,
+ (ignore, throwable) -> inBusyLock(busyLock, () ->
readLockFutureEx.thenComposeAsync(unused -> {
+ PartitionSet parts = new BitSetPartitionSet();
+
+ for (int i = 0; i < zoneDescriptor.partitions(); i++) {
+ if
(partitionReplicaLifecycleManager.hasLocalPartition(new
ZonePartitionId(zoneDescriptor.id(), i))) {
+ parts.set(i);
+ }
+ }
+
+ var table = (TableImpl)
tableRegistry.tables().get(tableId);
+
+ return createPartitionStoragesIfAbsent(table, parts)
+ .thenRun(() ->
tableRegistry.localPartsByTableId().put(tableId, parts));
+ }, ioExecutor))
+ // If the table is already closed, it's not a problem
(probably the node is stopping).
+ .exceptionally(ignoreTableClosedException())
+ );
+
+ CompletableFuture<?> createPartsFut =
assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
+ if (e != null) {
+ return failedFuture(e);
+ }
+
+ return localPartsUpdateFuture.thenRunAsync(() ->
inBusyLock(busyLock, () -> {
+ var table = (TableImpl) tableRegistry.tables().get(tableId);
+
+ for (int i = 0; i < zoneDescriptor.partitions(); i++) {
+ var zonePartitionId = new
ZonePartitionId(zoneDescriptor.id(), i);
+
+ if
(partitionReplicaLifecycleManager.hasLocalPartition(zonePartitionId)) {
+ preparePartitionResourcesAndLoadToZoneReplicaBusy(
+ table,
+ zonePartitionId,
+
partitionReplicaLifecycleManager.zonePartitionResources(zonePartitionId),
+ false
+ );
+ }
+ }
+ }), ioExecutor);
+ });
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible
performance degradation.
+ return createPartsFut.thenAccept(ignore -> {
+ var table = (TableImpl) tableRegistry.tables().get(tableId);
+
+ tableRegistry.startedTables().put(tableId, table);
+
+ addTableToZone(zoneDescriptor.id(), table);
+ });
+ }
+
+ /**
+ * Stops and destroys table processors for all partitions of a table.
+ *
+ * @param table Table to stop and destroy processors for.
+ * @return Completion future.
+ */
+ CompletableFuture<Void> stopAndDestroyTableProcessors(TableViewInternal
table) {
+ InternalTable internalTable = table.internalTable();
+
+ int partitions = internalTable.partitions();
+
+ NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(
+ internalTable.zoneId(),
+ id -> new NaiveAsyncReadWriteLock());
+
+ CompletableFuture<Long> writeLockAcquisitionFuture =
zoneLock.writeLock();
+
+ try {
+ return writeLockAcquisitionFuture.thenCompose(stamp -> {
+ CompletableFuture<?>[] stopReplicaAndDestroyFutures = new
CompletableFuture<?>[partitions];
+
+ for (int partitionId = 0; partitionId < partitions;
partitionId++) {
+ CompletableFuture<Void> resourcesUnloadFuture;
+
+ resourcesUnloadFuture =
partitionReplicaLifecycleManager.unloadTableResourcesFromZoneReplica(
+ new ZonePartitionId(internalTable.zoneId(),
partitionId),
+ internalTable.tableId()
+ );
+
+ var tablePartitionId = new
TablePartitionId(internalTable.tableId(), partitionId);
+
+ stopReplicaAndDestroyFutures[partitionId] =
resourcesUnloadFuture
+ .thenCompose(v ->
stopAndDestroyTablePartition(tablePartitionId, table));
+ }
+
+ return allOf(stopReplicaAndDestroyFutures).whenComplete((res,
th) -> {
+ tablesPerZone.getOrDefault(internalTable.zoneId(),
emptySet()).remove(table);
+ });
+ }).whenComplete((unused, t) ->
writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite));
+ } catch (Throwable t) {
+ writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite);
+
+ throw t;
+ }
+ }
+
+ private CompletableFuture<Boolean>
beforeZoneReplicaStarted(LocalBeforeReplicaStartEventParameters parameters) {
+ return inBusyLockAsync(busyLock, () -> readyToProcessReplicaStarts
+ .thenCompose(v -> beforeZoneReplicaStartedImpl(parameters))
+ .thenApply(unused -> false)
+ );
+ }
+
+ private CompletableFuture<Void>
beforeZoneReplicaStartedImpl(LocalBeforeReplicaStartEventParameters parameters)
{
+ return inBusyLockAsync(busyLock, () -> {
+ ZonePartitionId zonePartitionId = parameters.zonePartitionId();
+
+ NaiveAsyncReadWriteLock zoneLock =
tablesPerZoneLocks.computeIfAbsent(
+ zonePartitionId.zoneId(),
+ id -> new NaiveAsyncReadWriteLock());
+
+ CompletableFuture<Long> readLockAcquisitionFuture =
zoneLock.readLock();
+
+ try {
+ return readLockAcquisitionFuture.thenCompose(stamp -> {
+ Set<TableViewInternal> zoneTables =
zoneTablesRawSet(zonePartitionId.zoneId());
+
+ return
createPartitionsAndLoadResourcesToZoneReplica(zonePartitionId, zoneTables,
parameters);
+ }).whenComplete((unused, t) ->
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead));
+ } catch (Throwable t) {
+ readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
+
+ return failedFuture(t);
+ }
+ });
+ }
+
+ private CompletableFuture<Void>
createPartitionsAndLoadResourcesToZoneReplica(
+ ZonePartitionId zonePartitionId,
+ Set<TableViewInternal> zoneTables,
+ LocalBeforeReplicaStartEventParameters event
+ ) {
+ int partitionIndex = zonePartitionId.partitionId();
+
+ PartitionSet singlePartitionIdSet = PartitionSet.of(partitionIndex);
+
+ List<CompletableFuture<?>> storageCreationFutures = zoneTables.stream()
+ .map(tbl -> inBusyLockAsync(busyLock, () ->
createPartitionStoragesIfAbsent(tbl, singlePartitionIdSet)
+ // If the table is already closed, it's not a problem
(probably the node is stopping).
+ .exceptionally(ignoreTableClosedException())))
+ .collect(toList());
+
+ return CompletableFutures.allOf(storageCreationFutures)
+ .thenRunAsync(() ->
scheduleMvPartitionsCleanupIfNeeded(zoneTables, partitionIndex, event),
ioExecutor)
+ // If a table is already closed, it's not a problem (probably
the node is stopping).
+ .exceptionally(ignoreTableClosedException())
+ .thenCompose(unused -> {
+ CompletableFuture<?>[] futures = zoneTables.stream()
+ .map(tbl -> inBusyLockAsync(busyLock, () -> {
+ return runAsync(() -> inBusyLock(busyLock, ()
-> {
+
tableRegistry.localPartsByTableId().compute(
+ tbl.tableId(),
+ (tableId, oldPartitionSet) ->
extendPartitionSet(oldPartitionSet, partitionIndex)
+ );
+
+ lowWatermark.getLowWatermarkSafe(lwm ->
+ registerIndexesToTable(
+ tbl,
+ catalogService,
+ singlePartitionIdSet,
+ tbl.schemaView(),
+ lwm
+ )
+ );
+
+
preparePartitionResourcesAndLoadToZoneReplicaBusy(
+ tbl,
+ zonePartitionId,
+ event.resources(),
+ event.onRecovery()
+ );
+ }), ioExecutor)
+ // If the table is already closed, it's not a
problem (probably the node is stopping).
+ .exceptionally(ignoreTableClosedException());
+ }))
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(futures);
+ });
+ }
+
+ private static void scheduleMvPartitionsCleanupIfNeeded(
+ Set<TableViewInternal> zoneTables,
+ int partitionIndex,
+ LocalBeforeReplicaStartEventParameters event
+ ) {
+ boolean anyMvPartitionStorageIsInRebalanceState = zoneTables.stream()
+ .map(table ->
table.internalTable().storage().getMvPartition(partitionIndex))
+ .filter(Objects::nonNull)
+ .anyMatch(partitionStorage ->
partitionStorage.lastAppliedIndex() ==
MvPartitionStorage.REBALANCE_IN_PROGRESS);
+
+ if (anyMvPartitionStorageIsInRebalanceState) {
+ event.registerStorageInRebalanceState();
+ }
+
+ // Adding the cleanup action even if no MV partition storage is in
rebalance state as it might be that the TX state storage is.
+ event.addCleanupAction(() -> {
+ CompletableFuture<?>[] clearFutures = zoneTables.stream()
+ .map(table ->
table.internalTable().storage().clearPartition(partitionIndex))
+ .toArray(CompletableFuture[]::new);
+ return allOf(clearFutures);
+ });
+ }
+
+ private CompletableFuture<Boolean>
onZoneReplicaStopped(LocalPartitionReplicaEventParameters parameters) {
+ ZonePartitionId zonePartitionId = parameters.zonePartitionId();
+
+ NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(
+ zonePartitionId.zoneId(),
+ id -> new NaiveAsyncReadWriteLock());
+
+ CompletableFuture<Long> readLockAcquisitionFuture =
zoneLock.readLock();
+
+ try {
+ return readLockAcquisitionFuture.thenCompose(stamp ->
inBusyLockAsync(busyLock, () -> {
+ CompletableFuture<?>[] futures =
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
+ .map(this::stopTablePartitions)
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(futures);
+ })).whenComplete((v, t) ->
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead)).thenApply(v ->
false);
+ } catch (Throwable t) {
+ readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
+
+ return failedFuture(t);
+ }
+ }
+
+ private CompletableFuture<Boolean>
onZoneReplicaDestroyed(LocalPartitionReplicaEventParameters parameters) {
+ ZonePartitionId zonePartitionId = parameters.zonePartitionId();
+
+ NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(
+ zonePartitionId.zoneId(),
+ id -> new NaiveAsyncReadWriteLock());
+
+ CompletableFuture<Long> readLockAcquisitionFuture =
zoneLock.readLock();
+
+ try {
+ return readLockAcquisitionFuture.thenCompose(stamp -> {
+ return inBusyLockAsync(busyLock, () -> {
+ CompletableFuture<?>[] futures =
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
+ .map(table -> supplyAsync(
+ () -> inBusyLockAsync(
+ busyLock,
+ () -> stopAndDestroyTablePartition(
+ new
TablePartitionId(table.tableId(), zonePartitionId.partitionId()),
+ parameters.causalityToken()
+ )
+ ),
+ ioExecutor).thenCompose(identity()))
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(futures);
+ });
+ }).whenComplete((v, t) ->
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead)).thenApply(unused ->
false);
+ } catch (Throwable t) {
+ readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
+
+ return failedFuture(t);
+ }
+ }
+
+ private void preparePartitionResourcesAndLoadToZoneReplicaBusy(
+ TableViewInternal table,
+ ZonePartitionId zonePartitionId,
+ ZonePartitionResources resources,
+ boolean onNodeRecovery
+ ) {
+ int partId = zonePartitionId.partitionId();
+
+ int tableId = table.tableId();
+
+ var tablePartitionId = new TablePartitionId(tableId, partId);
+
+ MvPartitionStorage mvPartitionStorage;
+ try {
+ mvPartitionStorage = getMvPartitionStorage(table, partId);
+ } catch (TableClosedException e) {
+ // The node is probably stopping while we start the table, let's
just skip it.
+ return;
+ }
+
+ PartitionDataStorage partitionDataStorage =
partitionResourcesFactory.createPartitionDataStorage(
+ new PartitionKey(zonePartitionId.zoneId(), partId),
+ tableId,
+ mvPartitionStorage
+ );
+
+ PartitionResources partitionResources =
partitionResourcesFactory.createPartitionResources(
+ partId,
+ partitionDataStorage,
+ table,
+ resources.safeTimeTracker()
+ );
+
+ partitionResources.storageUpdateHandler.start(onNodeRecovery);
+
+ registerPartitionTableStatsMetrics(table, partId, partitionResources);
+
+ mvGc.addStorage(tablePartitionId, partitionResources.gcUpdateHandler);
+
+ minTimeCollectorService.addPartition(tablePartitionId);
+
+ TablePartitionReplicaProcessorFactory createListener = (raftClient,
transactionStateResolver) ->
+ partitionResourcesFactory.createReplicaListener(
+ zonePartitionId,
+ table,
+ resources.safeTimeTracker(),
+ mvPartitionStorage,
+ partitionResources,
+ raftClient,
+ transactionStateResolver
+ );
+
+ TablePartitionProcessor tablePartitionProcessor =
partitionResourcesFactory.createTablePartitionProcessor(
+ zonePartitionId, table, partitionDataStorage,
partitionResources);
+
+ PartitionMvStorageAccess partitionStorageAccess =
partitionResourcesFactory.createPartitionMvStorageAccess(
+ partId, table, partitionResources);
+
+ partitionReplicaLifecycleManager.loadTableListenerToZoneReplica(
+ zonePartitionId,
+ tableId,
+ createListener,
+ tablePartitionProcessor,
+ partitionStorageAccess,
+ onNodeRecovery
+ );
+ }
+
+ private CompletableFuture<Void> stopTablePartitions(TableViewInternal
table) {
+ return supplyAsync(() -> {
+ InternalTable internalTable = table.internalTable();
+
+ var stopReplicaFutures = new
CompletableFuture<?>[internalTable.partitions()];
+
+ for (int p = 0; p < internalTable.partitions(); p++) {
+ TablePartitionId replicationGroupId = new
TablePartitionId(table.tableId(), p);
+
+ stopReplicaFutures[p] = stopTablePartition(replicationGroupId,
table);
+ }
+
+ return allOf(stopReplicaFutures);
+ }, ioExecutor).thenCompose(identity());
+ }
+
+ private CompletableFuture<Void>
stopAndDestroyTablePartition(TablePartitionId tablePartitionId, long
causalityToken) {
+ CompletableFuture<?> tokenFuture;
+
+ try {
+ tokenFuture = tablesVv.get(causalityToken);
+ } catch (OutdatedTokenException e) {
+ // Here we need only to ensure that the token has been seen.
+ // TODO https://issues.apache.org/jira/browse/IGNITE-25742
+ tokenFuture = nullCompletedFuture();
+ }
+
+ return tokenFuture
+ .thenCompose(ignore -> {
+ TableViewInternal table =
tableRegistry.tables().get(tablePartitionId.tableId());
+ assert table != null : tablePartitionId;
+
+ return stopAndDestroyTablePartition(tablePartitionId,
table);
+ });
+ }
+
+ private CompletableFuture<Void> stopAndDestroyTablePartition(
+ TablePartitionId tablePartitionId,
+ TableViewInternal table
+ ) {
+ return stopTablePartition(tablePartitionId, table)
+ .thenComposeAsync(v ->
destroyPartitionStorages(tablePartitionId, table), ioExecutor);
+ }
+
+ private CompletableFuture<Void> stopTablePartition(TablePartitionId
tablePartitionId, TableViewInternal table) {
+ // In case of colocation there shouldn't be any table replica and thus
it shouldn't be stopped.
+ minTimeCollectorService.removePartition(tablePartitionId);
+
+ unregisterPartitionMetrics(tablePartitionId, table.name());
+
+ return mvGc.removeStorage(tablePartitionId);
+ }
+
+ private void registerPartitionTableStatsMetrics(
+ TableViewInternal table,
+ int partitionId,
+ PartitionResources partitionResources
+ ) {
+ var tablePartitionId = new TablePartitionId(table.tableId(),
partitionId);
+
+ PartitionTableStatsMetricSource metricSource =
+ new PartitionTableStatsMetricSource(table.tableId(),
partitionId, partitionResources.modificationCounter);
+
+ try {
+ // Only register this Metrics Source and do not enable it by
default
+ // as it is intended for online troubleshooting purposes only (see
IGNITE-27813).
+ metricManager.registerSource(metricSource);
+
+ partModCounterMetricSources.put(tablePartitionId, metricSource);
+ } catch (Exception e) {
+ LOG.warn("Failed to register metrics source for table [name={},
partitionId={}].", e, table.name(), partitionId);
+ }
+
+ pendingWriteIntentsSuppliers.put(
+ tablePartitionId,
+ partitionResources.storageUpdateHandler::getPendingRowCount
+ );
+ }
+
+ private void unregisterPartitionMetrics(TablePartitionId tablePartitionId,
String tableName) {
+ PartitionTableStatsMetricSource metricSource =
partModCounterMetricSources.remove(tablePartitionId);
+ pendingWriteIntentsSuppliers.remove(tablePartitionId);
+ if (metricSource != null) {
+ try {
+ metricManager.unregisterSource(metricSource);
+ } catch (Exception e) {
+ String message = "Failed to unregister metrics source for
table [name={}, partitionId={}].";
+ LOG.warn(message, e, tableName,
tablePartitionId.partitionId());
+ }
+ }
+ }
+
+ private static CompletableFuture<Void> destroyPartitionStorages(
+ TablePartitionId tablePartitionId,
+ TableViewInternal table
+ ) {
+ InternalTable internalTable = table.internalTable();
+
+ int partitionId = tablePartitionId.partitionId();
+
+ List<CompletableFuture<?>> destroyFutures = new ArrayList<>();
+
+ try {
+ if (internalTable.storage().getMvPartition(partitionId) != null) {
+
destroyFutures.add(internalTable.storage().destroyPartition(partitionId));
+ }
+ } catch (StorageDestroyedException ignored) {
+ // Ignore as the storage is already destroyed, no need to destroy
it again.
+ } catch (StorageClosedException ignored) {
+ // The storage is closed, so the node is being stopped. We'll
destroy the partition on node recovery.
+ }
+
+ // TODO: IGNITE-24926 - reduce set in localPartsByTableId after
storages destruction.
+ return allOf(destroyFutures.toArray(new CompletableFuture[]{}));
+ }
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19739 Create
storages only once.
+ private static CompletableFuture<Void>
createPartitionStoragesIfAbsent(TableViewInternal table, PartitionSet
partitions) {
+ InternalTable internalTable = table.internalTable();
+
+ List<CompletableFuture<MvPartitionStorage>> storageFuts =
partitions.stream().mapToObj(partitionId -> {
+ MvPartitionStorage mvPartition;
+ try {
+ mvPartition =
internalTable.storage().getMvPartition(partitionId);
+ } catch (StorageClosedException e) {
+ return CompletableFuture.<MvPartitionStorage>failedFuture(new
TableClosedException(table.tableId(), e));
+ }
+
+ return mvPartition != null
+ ? CompletableFuture.completedFuture(mvPartition)
+ : internalTable.storage().createMvPartition(partitionId);
+ }).collect(toList());
+
+ return CompletableFutures.allOf(storageFuts);
+ }
+
+ private static MvPartitionStorage getMvPartitionStorage(TableViewInternal
table, int partitionId) {
+ InternalTable internalTable = table.internalTable();
+
+ MvPartitionStorage mvPartition;
+ try {
+ mvPartition = internalTable.storage().getMvPartition(partitionId);
+ } catch (StorageClosedException e) {
+ throw new TableClosedException(table.tableId(), e);
+ }
+
+ assert mvPartition != null : "tableId=" + table.tableId() + ",
partitionId=" + partitionId;
+
+ return mvPartition;
+ }
+
+ private Set<TableViewInternal> zoneTablesRawSet(int zoneId) {
+ return tablesPerZone.getOrDefault(zoneId, Set.of());
+ }
+
+ private static PartitionSet extendPartitionSet(@Nullable PartitionSet
oldPartitionSet, int partitionId) {
+ PartitionSet newPartitionSet =
Objects.requireNonNullElseGet(oldPartitionSet, BitSetPartitionSet::new);
+ newPartitionSet.set(partitionId);
+ return newPartitionSet;
+ }
+
+ private static <T> Function<Throwable, T> ignoreTableClosedException() {
+ return ex -> {
+ if (hasCause(ex, TableClosedException.class)) {
+ return null;
+ }
+ throw sneakyThrow(ex);
+ };
+ }
+
+ private static class TableClosedException extends IgniteInternalException {
+ private static final long serialVersionUID = 1L;
+
+ TableClosedException(int tableId, @Nullable Throwable cause) {
+ super(INTERNAL_ERR, "Table is closed [tableId=" + tableId + "]",
cause);
+ }
+ }
+}