This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 16c9ae2ca0e IGNITE-28008 Extract TableZoneCoordinator out of 
TableManager (#7805)
16c9ae2ca0e is described below

commit 16c9ae2ca0ec6f9d03c94f08f727cc4b5bba3dba
Author: Viacheslav Blinov <[email protected]>
AuthorDate: Mon Mar 23 12:22:41 2026 +0300

    IGNITE-28008 Extract TableZoneCoordinator out of TableManager (#7805)
---
 .../internal/table/distributed/TableManager.java   | 761 ++-----------------
 .../internal/table/distributed/TableRegistry.java  |  54 ++
 .../table/distributed/TableZoneCoordinator.java    | 818 +++++++++++++++++++++
 3 files changed, 924 insertions(+), 709 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index df6bee33931..104b7b1f09f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -17,23 +17,15 @@
 
 package org.apache.ignite.internal.table.distributed;
 
-import static java.util.Collections.emptySet;
 import static java.util.Collections.unmodifiableMap;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.anyOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.concurrent.CompletableFuture.runAsync;
-import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.function.Function.identity;
-import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
 import static org.apache.ignite.internal.event.EventListener.fromConsumer;
-import static 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED;
-import static 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED;
-import static 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED;
 import static 
org.apache.ignite.internal.table.distributed.TableUtils.aliveTables;
-import static 
org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexesToTable;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
 import static org.apache.ignite.internal.util.CollectionUtils.difference;
@@ -43,33 +35,26 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.emptyListComple
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
-import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
-import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
@@ -85,7 +70,6 @@ import 
org.apache.ignite.internal.catalog.events.DropTableEventParameters;
 import org.apache.ignite.internal.catalog.events.RenameTableEventParameters;
 import org.apache.ignite.internal.causality.CompletionListener;
 import org.apache.ignite.internal.causality.IncrementalVersionedValue;
-import org.apache.ignite.internal.causality.OutdatedTokenException;
 import org.apache.ignite.internal.causality.RevisionListenerRegistry;
 import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
 import 
org.apache.ignite.internal.configuration.utils.SystemDistributedConfigurationPropertyHolder;
@@ -111,32 +95,19 @@ import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.TopologyService;
-import 
org.apache.ignite.internal.partition.replicator.LocalBeforeReplicaStartEventParameters;
-import 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEventParameters;
-import org.apache.ignite.internal.partition.replicator.NaiveAsyncReadWriteLock;
 import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
-import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager.TablePartitionReplicaProcessorFactory;
-import 
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
-import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
-import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
-import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import 
org.apache.ignite.internal.partition.replicator.schema.ExecutorInclinedSchemaSyncService;
 import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import 
org.apache.ignite.internal.placementdriver.wrappers.ExecutorInclinedPlacementDriver;
 import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.SchemaSyncService;
 import org.apache.ignite.internal.schema.configuration.GcConfiguration;
 import org.apache.ignite.internal.storage.DataStorageManager;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.StorageClosedException;
-import org.apache.ignite.internal.storage.StorageDestroyedException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageEngine;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
@@ -149,7 +120,6 @@ import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
-import 
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
@@ -163,7 +133,6 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
-import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.LongPriorityQueue;
@@ -221,19 +190,12 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      */
     private final IncrementalVersionedValue<Void> assignmentsUpdatedVv;
 
-    /** Registered tables. */
-    private final Map<Integer, TableViewInternal> tables = new 
ConcurrentHashMap<>();
-
-    /** Started tables. */
-    private final Map<Integer, TableViewInternal> startedTables = new 
ConcurrentHashMap<>();
+    private final TableRegistry tableRegistry = new TableRegistry();
 
     /** A queue for deferred table destruction events. */
     private final LongPriorityQueue<DestroyTableEvent> destructionEventsQueue =
             new LongPriorityQueue<>(DestroyTableEvent::catalogVersion);
 
-    /** Local partitions. */
-    private final Map<Integer, PartitionSet> localPartsByTableId = new 
ConcurrentHashMap<>();
-
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
@@ -261,8 +223,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
     private final FailureProcessor failureProcessor;
 
-    private final MvGc mvGc;
-
     private final LowWatermark lowWatermark;
 
     private final HybridTimestampTracker observableTimestampTracker;
@@ -283,9 +243,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     /** Marshallers provider. */
     private final ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
 
-    /** Index chooser for full state transfer. */
-    private final FullStateTransferIndexChooser fullStateTransferIndexChooser;
-
     private final TransactionInflights transactionInflights;
 
     private final String nodeName;
@@ -295,26 +252,14 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     @Nullable
     private ScheduledExecutorService streamerFlushExecutor;
 
-    private final MinimumRequiredTimeCollectorService minTimeCollectorService;
-
     @Nullable
     private StreamerReceiverRunner streamerReceiverRunner;
 
     private final CompletableFuture<Void> readyToProcessReplicaStarts = new 
CompletableFuture<>();
 
-    // TODO https://issues.apache.org/jira/browse/IGNITE-25347
-    /** Mapping zone identifier to a collection of tables related to the zone. 
*/
-    private final Map<Integer, Set<TableViewInternal>> tablesPerZone = new 
HashMap<>();
-    /** Locks to synchronize an access to the {@link #tablesPerZone}. */
-    private final Map<Integer, NaiveAsyncReadWriteLock> tablesPerZoneLocks = 
new ConcurrentHashMap<>();
-
     /** Configuration of rebalance retries delay. */
     private final SystemDistributedConfigurationPropertyHolder<Integer> 
rebalanceRetryDelayConfiguration;
 
-    private final EventListener<LocalBeforeReplicaStartEventParameters> 
onBeforeZoneReplicaStartedListener = this::beforeZoneReplicaStarted;
-    private final EventListener<LocalPartitionReplicaEventParameters> 
onZoneReplicaStoppedListener = this::onZoneReplicaStopped;
-    private final EventListener<LocalPartitionReplicaEventParameters> 
onZoneReplicaDestroyedListener = this::onZoneReplicaDestroyed;
-
     private final EventListener<CreateTableEventParameters> 
onTableCreateListener = this::loadTableToZoneOnTableCreate;
     private final EventListener<DropTableEventParameters> onTableDropListener 
= fromConsumer(this::onTableDrop);
     private final EventListener<CatalogEventParameters> onTableAlterListener = 
this::onTableAlter;
@@ -323,11 +268,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
     private final MetricManager metricManager;
 
-    private final Map<TablePartitionId, PartitionTableStatsMetricSource> 
partModCounterMetricSources = new ConcurrentHashMap<>();
-    private final Map<TablePartitionId, LongSupplier> 
pendingWriteIntentsSuppliers = new ConcurrentHashMap<>();
-
     private final TablePartitionResourcesFactory partitionResourcesFactory;
 
+    private final TableZoneCoordinator zoneCoordinator;
+
     /**
      * Creates a new table manager.
      *
@@ -406,7 +350,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         this.transactionInflights = transactionInflights;
         this.nodeName = nodeName;
         this.partitionReplicaLifecycleManager = 
partitionReplicaLifecycleManager;
-        this.minTimeCollectorService = minTimeCollectorService;
         this.metricManager = metricManager;
 
         this.executorInclinedSchemaSyncService = new 
ExecutorInclinedSchemaSyncService(schemaSyncService, 
partitionOperationsExecutor);
@@ -423,7 +366,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         scanRequestExecutor = Executors.newSingleThreadExecutor(
                 IgniteThreadFactory.create(nodeName, "scan-query-executor", 
LOG, STORAGE_READ));
 
-        mvGc = new MvGc(nodeName, gcConfig, lowWatermark, failureProcessor);
+        MvGc mvGc = new MvGc(nodeName, gcConfig, lowWatermark, 
failureProcessor);
 
         partitionReplicatorNodeRecovery = new PartitionReplicatorNodeRecovery(
                 messagingService,
@@ -431,7 +374,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                 tableId -> tablesById().get(tableId)
         );
 
-        fullStateTransferIndexChooser = new 
FullStateTransferIndexChooser(catalogService, lowWatermark, indexMetaStorage);
+        FullStateTransferIndexChooser fullStateTransferIndexChooser =
+                new FullStateTransferIndexChooser(catalogService, 
lowWatermark, indexMetaStorage);
 
         partitionResourcesFactory = new TablePartitionResourcesFactory(
                 txManager,
@@ -465,22 +409,32 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 Integer::parseInt
         );
 
-        // Register event listeners in the constructor to avoid races with 
"partitionReplicaLifecycleManager"'s recovery.
-        // We rely on the "readyToProcessReplicaStarts" future to block event 
handling until "startAsync" is completed.
-        partitionReplicaLifecycleManager.listen(BEFORE_REPLICA_STARTED, 
onBeforeZoneReplicaStartedListener);
-        partitionReplicaLifecycleManager.listen(AFTER_REPLICA_STOPPED, 
onZoneReplicaStoppedListener);
-        partitionReplicaLifecycleManager.listen(AFTER_REPLICA_DESTROYED, 
onZoneReplicaDestroyedListener);
+        zoneCoordinator = new TableZoneCoordinator(
+                mvGc,
+                fullStateTransferIndexChooser,
+                partitionReplicaLifecycleManager,
+                partitionResourcesFactory,
+                minTimeCollectorService,
+                metricManager,
+                catalogService,
+                lowWatermark,
+                ioExecutor,
+                busyLock,
+                tableRegistry,
+                tablesVv,
+                localPartitionsVv,
+                assignmentsUpdatedVv,
+                readyToProcessReplicaStarts
+        );
     }
 
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
         return inBusyLockAsync(busyLock, () -> {
             TransactionMetricsSource transactionMetricsSource = 
txManager.transactionMetricsSource();
-            
transactionMetricsSource.setPendingWriteIntentsSupplier(this::totalPendingWriteIntents);
-
-            mvGc.start();
+            
transactionMetricsSource.setPendingWriteIntentsSupplier(zoneCoordinator::totalPendingWriteIntents);
 
-            fullStateTransferIndexChooser.start();
+            zoneCoordinator.start();
 
             rebalanceRetryDelayConfiguration.init();
 
@@ -504,187 +458,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         });
     }
 
-    private CompletableFuture<Boolean> 
beforeZoneReplicaStarted(LocalBeforeReplicaStartEventParameters parameters) {
-        return inBusyLockAsync(busyLock, () -> readyToProcessReplicaStarts
-                .thenCompose(v -> beforeZoneReplicaStartedImpl(parameters))
-                .thenApply(unused -> false)
-        );
-    }
-
-    private CompletableFuture<Void> 
beforeZoneReplicaStartedImpl(LocalBeforeReplicaStartEventParameters parameters) 
{
-        return inBusyLockAsync(busyLock, () -> {
-            ZonePartitionId zonePartitionId = parameters.zonePartitionId();
-
-            NaiveAsyncReadWriteLock zoneLock = 
tablesPerZoneLocks.computeIfAbsent(
-                    zonePartitionId.zoneId(),
-                    id -> new NaiveAsyncReadWriteLock());
-
-            CompletableFuture<Long> readLockAcquisitionFuture = 
zoneLock.readLock();
-
-            try {
-                return readLockAcquisitionFuture.thenCompose(stamp -> {
-                    Set<TableViewInternal> zoneTables = 
zoneTablesRawSet(zonePartitionId.zoneId());
-
-                    return 
createPartitionsAndLoadResourcesToZoneReplica(zonePartitionId, zoneTables, 
parameters);
-                }).whenComplete((unused, t) -> 
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead));
-            } catch (Throwable t) {
-                readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
-
-                return failedFuture(t);
-            }
-        });
-    }
-
-    private CompletableFuture<Void> 
createPartitionsAndLoadResourcesToZoneReplica(
-            ZonePartitionId zonePartitionId,
-            Set<TableViewInternal> zoneTables,
-            LocalBeforeReplicaStartEventParameters event
-    ) {
-        int partitionIndex = zonePartitionId.partitionId();
-
-        PartitionSet singlePartitionIdSet = PartitionSet.of(partitionIndex);
-
-        List<CompletableFuture<?>> storageCreationFutures = zoneTables.stream()
-                .map(tbl -> inBusyLockAsync(busyLock, () -> {
-                    return createPartitionStoragesIfAbsent(tbl, 
singlePartitionIdSet)
-                            // If the table is already closed, it's not a 
problem (probably the node is stopping).
-                            .exceptionally(ignoreTableClosedException());
-                }))
-                .collect(toList());
-
-        return CompletableFutures.allOf(storageCreationFutures)
-                .thenRunAsync(() -> 
scheduleMvPartitionsCleanupIfNeeded(zoneTables, partitionIndex, event), 
ioExecutor)
-                // If a table is already closed, it's not a problem (probably 
the node is stopping).
-                .exceptionally(ignoreTableClosedException())
-                .thenCompose(unused -> {
-                    CompletableFuture<?>[] futures = zoneTables.stream()
-                            .map(tbl -> inBusyLockAsync(busyLock, () -> {
-                                return runAsync(() -> inBusyLock(busyLock, () 
-> {
-                                    localPartsByTableId.compute(
-                                            tbl.tableId(),
-                                            (tableId, oldPartitionSet) -> 
extendPartitionSet(oldPartitionSet, partitionIndex)
-                                    );
-
-                                    lowWatermark.getLowWatermarkSafe(lwm ->
-                                            registerIndexesToTable(
-                                                    tbl,
-                                                    catalogService,
-                                                    singlePartitionIdSet,
-                                                    tbl.schemaView(),
-                                                    lwm
-                                            )
-                                    );
-
-                                    
preparePartitionResourcesAndLoadToZoneReplicaBusy(
-                                            tbl,
-                                            zonePartitionId,
-                                            event.resources(),
-                                            event.onRecovery()
-                                    );
-                                }), ioExecutor)
-                                // If the table is already closed, it's not a 
problem (probably the node is stopping).
-                                .exceptionally(ignoreTableClosedException());
-                            }))
-                            .toArray(CompletableFuture[]::new);
-
-                    return allOf(futures);
-                });
-    }
-
-    private static void scheduleMvPartitionsCleanupIfNeeded(
-            Set<TableViewInternal> zoneTables,
-            int partitionIndex,
-            LocalBeforeReplicaStartEventParameters event
-    ) {
-        boolean anyMvPartitionStorageIsInRebalanceState = zoneTables.stream()
-                .map(table -> 
table.internalTable().storage().getMvPartition(partitionIndex))
-                .filter(Objects::nonNull)
-                .anyMatch(partitionStorage -> 
partitionStorage.lastAppliedIndex() == 
MvPartitionStorage.REBALANCE_IN_PROGRESS);
-
-        if (anyMvPartitionStorageIsInRebalanceState) {
-            event.registerStorageInRebalanceState();
-        }
-
-        // Adding the cleanup action even if no MV partition storage is in 
rebalance state as it might be that the TX state storage is.
-        event.addCleanupAction(() -> {
-            CompletableFuture<?>[] clearFutures = zoneTables.stream()
-                    .map(table -> 
table.internalTable().storage().clearPartition(partitionIndex))
-                    .toArray(CompletableFuture[]::new);
-            return allOf(clearFutures);
-        });
-    }
-
-    private static <T> Function<Throwable, T> ignoreTableClosedException() {
-        return ex -> {
-            if (hasCause(ex, TableClosedException.class)) {
-                return null;
-            }
-            throw sneakyThrow(ex);
-        };
-    }
-
-    private CompletableFuture<Boolean> 
onZoneReplicaStopped(LocalPartitionReplicaEventParameters parameters) {
-        ZonePartitionId zonePartitionId = parameters.zonePartitionId();
-
-        NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(
-                zonePartitionId.zoneId(),
-                id -> new NaiveAsyncReadWriteLock());
-
-        CompletableFuture<Long> readLockAcquisitionFuture = 
zoneLock.readLock();
-
-        try {
-            return readLockAcquisitionFuture.thenCompose(stamp -> 
inBusyLockAsync(busyLock, () -> {
-                CompletableFuture<?>[] futures = 
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
-                        .map(this::stopTablePartitions)
-                        .toArray(CompletableFuture[]::new);
-
-                return allOf(futures);
-            })).whenComplete((v, t) -> 
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead)).thenApply(v -> 
false);
-        } catch (Throwable t) {
-            readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
-
-            return failedFuture(t);
-        }
-    }
-
-    private CompletableFuture<Boolean> 
onZoneReplicaDestroyed(LocalPartitionReplicaEventParameters parameters) {
-        ZonePartitionId zonePartitionId = parameters.zonePartitionId();
-
-        NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(
-                zonePartitionId.zoneId(),
-                id -> new NaiveAsyncReadWriteLock());
-
-        CompletableFuture<Long> readLockAcquisitionFuture = 
zoneLock.readLock();
-
-        try {
-            return readLockAcquisitionFuture.thenCompose(stamp -> {
-                return inBusyLockAsync(busyLock, () -> {
-                    CompletableFuture<?>[] futures = 
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
-                            .map(table -> supplyAsync(
-                                    () -> inBusyLockAsync(
-                                            busyLock,
-                                            () -> stopAndDestroyTablePartition(
-                                                    new 
TablePartitionId(table.tableId(), zonePartitionId.partitionId()),
-                                                    parameters.causalityToken()
-                                            )
-                                    ),
-                                    ioExecutor).thenCompose(identity()))
-                            .toArray(CompletableFuture[]::new);
-
-                    return allOf(futures);
-                });
-            }).whenComplete((v, t) -> 
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead)).thenApply((unused) 
-> false);
-        } catch (Throwable t) {
-            readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
-
-            return failedFuture(t);
-        }
-    }
-
     /**
      * During node recovery pre-populates required internal table structures 
before zone replicas are started.
      *
-     * <p>The created resources will then be loaded during replica startup in 
{@link #beforeZoneReplicaStarted}.
+     * <p>The created resources will then be loaded during replica startup by 
{@link TableZoneCoordinator}.
      */
     private CompletableFuture<Void> prepareTableResourcesOnRecovery(
             long causalityToken,
@@ -706,11 +483,11 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                                 schemaRegistry
                         );
 
-                        tables.put(tableId, table);
+                        tableRegistry.tables().put(tableId, table);
 
-                        addTableToZone(zoneDescriptor.id(), table);
+                        zoneCoordinator.addTableToZone(zoneDescriptor.id(), 
table);
 
-                        startedTables.put(tableId, table);
+                        tableRegistry.startedTables().put(tableId, table);
                     }));
         });
     }
@@ -741,7 +518,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
             return schemaManager.schemaRegistry(causalityToken, 
tableId).thenAccept(schemaRegistry -> {
                 TableImpl table = createTableImpl(causalityToken, 
tableDescriptor, zoneDescriptor, schemaDescriptor, schemaRegistry);
 
-                tables.put(tableId, table);
+                tableRegistry.tables().put(tableId, table);
             });
         }));
 
@@ -749,7 +526,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         // will call update() on VVs and inside those updates it will chain on 
the lock acquisition future.
         CompletableFuture<Long> acquisitionFuture = 
partitionReplicaLifecycleManager.lockZoneForRead(zoneDescriptor.id());
         try {
-            return 
loadTableToZoneOnTableCreateHavingZoneReadLock(acquisitionFuture, 
causalityToken, zoneDescriptor, tableId)
+            return zoneCoordinator.loadTableToZone(acquisitionFuture, 
causalityToken, zoneDescriptor, tableId)
                     .whenComplete((res, ex) -> 
unlockZoneForRead(zoneDescriptor, acquisitionFuture));
         } catch (Throwable e) {
             unlockZoneForRead(zoneDescriptor, acquisitionFuture);
@@ -758,151 +535,15 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         }
     }
 
-    private CompletableFuture<Void> 
loadTableToZoneOnTableCreateHavingZoneReadLock(
-            CompletableFuture<Long> readLockAcquisitionFuture,
-            long causalityToken,
-            CatalogZoneDescriptor zoneDescriptor,
-            int tableId
-    ) {
-        CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
-        CompletableFuture<Void> readLockFutureEx = 
allOf(readLockAcquisitionFuture, tablesByIdFuture);
-
-        // NB: all vv.update() calls must be made from the synchronous part of 
the method (not in thenCompose()/etc!).
-        CompletableFuture<?> localPartsUpdateFuture = 
localPartitionsVv.update(causalityToken,
-                (ignore, throwable) -> inBusyLock(busyLock, () -> 
readLockFutureEx.thenComposeAsync(unused -> {
-                    PartitionSet parts = new BitSetPartitionSet();
-
-                    for (int i = 0; i < zoneDescriptor.partitions(); i++) {
-                        if 
(partitionReplicaLifecycleManager.hasLocalPartition(new 
ZonePartitionId(zoneDescriptor.id(), i))) {
-                            parts.set(i);
-                        }
-                    }
-
-                    var table = (TableImpl) tables.get(tableId);
-
-                    return createPartitionStoragesIfAbsent(table, 
parts).thenRun(() -> localPartsByTableId.put(tableId, parts));
-                }, ioExecutor))
-                // If the table is already closed, it's not a problem 
(probably the node is stopping).
-                .exceptionally(ignoreTableClosedException())
-        );
-
-        CompletableFuture<?> createPartsFut = 
assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
-            if (e != null) {
-                return failedFuture(e);
-            }
-
-            return localPartsUpdateFuture.thenRunAsync(() -> 
inBusyLock(busyLock, () -> {
-                var table = (TableImpl) tables.get(tableId);
-
-                for (int i = 0; i < zoneDescriptor.partitions(); i++) {
-                    var zonePartitionId = new 
ZonePartitionId(zoneDescriptor.id(), i);
-
-                    if 
(partitionReplicaLifecycleManager.hasLocalPartition(zonePartitionId)) {
-                        preparePartitionResourcesAndLoadToZoneReplicaBusy(
-                                table,
-                                zonePartitionId,
-                                
partitionReplicaLifecycleManager.zonePartitionResources(zonePartitionId),
-                                false
-                        );
-                    }
-                }
-            }), ioExecutor);
-        });
-
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible 
performance degradation.
-        return createPartsFut.thenAccept(ignore -> {
-            var table = (TableImpl) tables.get(tableId);
-
-            startedTables.put(tableId, table);
-
-            addTableToZone(zoneDescriptor.id(), table);
-        });
-    }
-
     private void unlockZoneForRead(CatalogZoneDescriptor zoneDescriptor, 
CompletableFuture<Long> readLockAcquiryFuture) {
         readLockAcquiryFuture.thenAccept(stamp -> {
             
partitionReplicaLifecycleManager.unlockZoneForRead(zoneDescriptor.id(), stamp);
         });
     }
 
-    /**
-     * Prepare the table partition resources and load it to the zone-based 
replica.
-     *
-     * @param table Table.
-     * @param zonePartitionId Zone Partition ID.
-     * @param resources Replica resources.
-     */
-    private void preparePartitionResourcesAndLoadToZoneReplicaBusy(
-            TableViewInternal table,
-            ZonePartitionId zonePartitionId,
-            ZonePartitionResources resources,
-            boolean onNodeRecovery
-    ) {
-        int partId = zonePartitionId.partitionId();
-
-        int tableId = table.tableId();
-
-        var tablePartitionId = new TablePartitionId(tableId, partId);
-
-        MvPartitionStorage mvPartitionStorage;
-        try {
-            mvPartitionStorage = getMvPartitionStorage(table, partId);
-        } catch (TableClosedException e) {
-            // The node is probably stopping while we start the table, let's 
just skip it.
-            return;
-        }
-
-        PartitionDataStorage partitionDataStorage = 
partitionResourcesFactory.createPartitionDataStorage(
-                new PartitionKey(zonePartitionId.zoneId(), partId),
-                tableId,
-                mvPartitionStorage
-        );
-
-        PartitionResources partitionResources = 
partitionResourcesFactory.createPartitionResources(
-                partId,
-                partitionDataStorage,
-                table,
-                resources.safeTimeTracker()
-        );
-
-        partitionResources.storageUpdateHandler.start(onNodeRecovery);
-
-        registerPartitionTableStatsMetrics(table, partId, partitionResources);
-
-        mvGc.addStorage(tablePartitionId, partitionResources.gcUpdateHandler);
-
-        minTimeCollectorService.addPartition(new TablePartitionId(tableId, 
partId));
-
-        TablePartitionReplicaProcessorFactory createListener = (raftClient, 
transactionStateResolver) ->
-                partitionResourcesFactory.createReplicaListener(
-                        zonePartitionId,
-                        table,
-                        resources.safeTimeTracker(),
-                        mvPartitionStorage,
-                        partitionResources,
-                        raftClient,
-                        transactionStateResolver
-                );
-
-        TablePartitionProcessor tablePartitionProcessor = 
partitionResourcesFactory.createTablePartitionProcessor(
-                zonePartitionId, table, partitionDataStorage, 
partitionResources);
-
-        PartitionMvStorageAccess partitionStorageAccess = 
partitionResourcesFactory.createPartitionMvStorageAccess(
-                partId, table, partitionResources);
-
-        partitionReplicaLifecycleManager.loadTableListenerToZoneReplica(
-                zonePartitionId,
-                tableId,
-                createListener,
-                tablePartitionProcessor,
-                partitionStorageAccess,
-                onNodeRecovery
-        );
-    }
-
     private void onTableDrop(DropTableEventParameters parameters) {
         inBusyLock(busyLock, () -> {
-            unregisterMetricsSource(startedTables.get(parameters.tableId()));
+            
unregisterMetricsSource(tableRegistry.startedTables().get(parameters.tableId()));
 
             destructionEventsQueue.enqueue(new 
DestroyTableEvent(parameters.catalogVersion(), parameters.tableId()));
         });
@@ -947,7 +588,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                         return failedFuture(e);
                     }
 
-                    TableViewInternal table = tables.get(parameters.tableId());
+                    TableViewInternal table = 
tableRegistry.tables().get(parameters.tableId());
 
                     
table.updateStalenessConfiguration(parameters.staleRowsFraction(), 
parameters.minStaleRowsCount());
 
@@ -964,7 +605,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                         return failedFuture(e);
                     }
 
-                    TableViewInternal table = tables.get(parameters.tableId());
+                    TableViewInternal table = 
tableRegistry.tables().get(parameters.tableId());
 
                     // TODO: revisit this approach, see 
https://issues.apache.org/jira/browse/IGNITE-21235.
                     ((TableImpl) table).name(parameters.newTableName());
@@ -999,22 +640,18 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         if (!stopGuard.compareAndSet(false, true)) {
             return nullCompletedFuture();
         }
+
         TransactionMetricsSource transactionMetricsSource = 
txManager.transactionMetricsSource();
         if (transactionMetricsSource != null) {
             transactionMetricsSource.setPendingWriteIntentsSupplier(null);
         }
 
-        
partitionReplicaLifecycleManager.removeListener(AFTER_REPLICA_DESTROYED, 
onZoneReplicaDestroyedListener);
-        partitionReplicaLifecycleManager.removeListener(AFTER_REPLICA_STOPPED, 
onZoneReplicaStoppedListener);
-        
partitionReplicaLifecycleManager.removeListener(BEFORE_REPLICA_STARTED, 
onBeforeZoneReplicaStartedListener);
-
         int shutdownTimeoutSeconds = 10;
 
         try {
             closeAllManually(
-                    () -> closeAllManually(tables.values().stream().map(table 
-> () -> closeTable(table))),
-                    mvGc,
-                    fullStateTransferIndexChooser,
+                    zoneCoordinator::stop,
+                    () -> 
closeAllManually(tableRegistry.tables().values().stream().map(table -> () -> 
closeTable(table))),
                     () -> shutdownAndAwaitTermination(scanRequestExecutor, 
shutdownTimeoutSeconds, TimeUnit.SECONDS),
                     () -> {
                         ScheduledExecutorService streamerFlushExecutor;
@@ -1033,22 +670,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return nullCompletedFuture();
     }
 
-    private CompletableFuture<Void> stopTablePartitions(TableViewInternal 
table) {
-        return supplyAsync(() -> {
-            InternalTable internalTable = table.internalTable();
-
-            var stopReplicaFutures = new 
CompletableFuture<?>[internalTable.partitions()];
-
-            for (int p = 0; p < internalTable.partitions(); p++) {
-                TablePartitionId replicationGroupId = new 
TablePartitionId(table.tableId(), p);
-
-                stopReplicaFutures[p] = stopTablePartition(replicationGroupId, 
table);
-            }
-
-            return allOf(stopReplicaFutures);
-        }, ioExecutor).thenCompose(identity());
-    }
-
     private static void closeTable(TableViewInternal table) throws Exception {
         InternalTable internalTable = table.internalTable();
 
@@ -1142,18 +763,18 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      * @param tableId Table id to destroy.
      */
     private CompletableFuture<Void> destroyTableLocally(int tableId) {
-        TableViewInternal table = startedTables.remove(tableId);
+        TableViewInternal table = 
tableRegistry.startedTables().remove(tableId);
 
-        localPartsByTableId.remove(tableId);
+        tableRegistry.localPartsByTableId().remove(tableId);
 
         assert table != null : tableId;
 
         InternalTable internalTable = table.internalTable();
 
-        return stopAndDestroyTableProcessors(table)
+        return zoneCoordinator.stopAndDestroyTableProcessors(table)
                 .thenComposeAsync(unused -> inBusyLockAsync(busyLock, () -> 
internalTable.storage().destroy()), ioExecutor)
                 .thenAccept(unused -> inBusyLock(busyLock, () -> {
-                    tables.remove(tableId);
+                    tableRegistry.tables().remove(tableId);
                     schemaManager.dropRegistry(tableId);
                 }))
                 .whenComplete((v, e) -> {
@@ -1205,14 +826,14 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      * @see #assignmentsUpdatedVv
      */
     private CompletableFuture<Map<Integer, TableViewInternal>> tablesById(long 
causalityToken) {
-        return assignmentsUpdatedVv.get(causalityToken).thenApply(v -> 
unmodifiableMap(startedTables));
+        return assignmentsUpdatedVv.get(causalityToken).thenApply(v -> 
unmodifiableMap(tableRegistry.startedTables()));
     }
 
     /**
      * Returns an internal map, which contains all managed tables by their ID.
      */
     private Map<Integer, TableViewInternal> tablesById() {
-        return unmodifiableMap(tables);
+        return unmodifiableMap(tableRegistry.tables());
     }
 
     /**
@@ -1220,7 +841,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
      */
     @TestOnly
     public Map<Integer, TableViewInternal> startedTables() {
-        return unmodifiableMap(startedTables);
+        return unmodifiableMap(tableRegistry.startedTables());
     }
 
     @Override
@@ -1282,7 +903,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
         try {
             return localPartitionsVv.get(causalityToken)
-                    .thenApply(unused -> 
localPartsByTableId.getOrDefault(tableId, PartitionSet.EMPTY_SET));
+                    .thenApply(unused -> 
tableRegistry.localPartsByTableId().getOrDefault(tableId, 
PartitionSet.EMPTY_SET));
         } finally {
             busyLock.leaveBusy();
         }
@@ -1324,7 +945,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     }
 
     private CompletableFuture<TableViewInternal> tableAsyncInternalBusy(int 
tableId) {
-        TableViewInternal tableImpl = startedTables.get(tableId);
+        TableViewInternal tableImpl = 
tableRegistry.startedTables().get(tableId);
 
         if (tableImpl != null) {
             return completedFuture(tableImpl);
@@ -1336,11 +957,11 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             if (th == null) {
                 CompletableFuture<?> tablesFuture = tablesVv.get(token);
 
-                tablesFuture.whenComplete((tables, e) -> {
+                tablesFuture.whenComplete((tbls, e) -> {
                     if (e != null) {
                         getLatestTableFuture.completeExceptionally(e);
                     } else {
-                        
getLatestTableFuture.complete(startedTables.get(tableId));
+                        
getLatestTableFuture.complete(tableRegistry.startedTables().get(tableId));
                     }
                 });
             } else {
@@ -1354,7 +975,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
         // This check is needed for the case when we have registered 
tablesListener,
         // but tablesVv has already been completed, so listener would be 
triggered only for the next versioned value update.
-        tableImpl = startedTables.get(tableId);
+        tableImpl = tableRegistry.startedTables().get(tableId);
 
         if (tableImpl != null) {
             assignmentsUpdatedVv.removeWhenComplete(tablesListener);
@@ -1370,209 +991,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return IgniteUtils.getInterruptibly(future);
     }
 
-    private static PartitionSet extendPartitionSet(@Nullable PartitionSet 
oldPartitionSet, int partitionId) {
-        PartitionSet newPartitionSet = 
Objects.requireNonNullElseGet(oldPartitionSet, BitSetPartitionSet::new);
-        newPartitionSet.set(partitionId);
-        return newPartitionSet;
-    }
-
-    /**
-     * Gets MV partition storage.
-     *
-     * @param table Table.
-     * @param partitionId Partition ID.
-     * @return MvPartitionStorage.
-     */
-    private static MvPartitionStorage getMvPartitionStorage(TableViewInternal 
table, int partitionId) {
-        InternalTable internalTable = table.internalTable();
-
-        MvPartitionStorage mvPartition;
-        try {
-            mvPartition = internalTable.storage().getMvPartition(partitionId);
-        } catch (StorageClosedException e) {
-            throw new TableClosedException(table.tableId(), e);
-        }
-
-        assert mvPartition != null : "tableId=" + table.tableId() + ", 
partitionId=" + partitionId;
-
-        return mvPartition;
-    }
-
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-19739 Create 
storages only once.
-    private CompletableFuture<Void> 
createPartitionStoragesIfAbsent(TableViewInternal table, PartitionSet 
partitions) {
-        InternalTable internalTable = table.internalTable();
-
-        List<CompletableFuture<MvPartitionStorage>> storageFuts = 
partitions.stream().mapToObj(partitionId -> {
-            MvPartitionStorage mvPartition;
-            try {
-                mvPartition = 
internalTable.storage().getMvPartition(partitionId);
-            } catch (StorageClosedException e) {
-                return CompletableFuture.<MvPartitionStorage>failedFuture(new 
TableClosedException(table.tableId(), e));
-            }
-
-            return mvPartition != null ? completedFuture(mvPartition) : 
internalTable.storage().createMvPartition(partitionId);
-        }).collect(toList());
-
-        return CompletableFutures.allOf(storageFuts);
-    }
-
-    private CompletableFuture<Void> 
stopAndDestroyTableProcessors(TableViewInternal table) {
-        InternalTable internalTable = table.internalTable();
-
-        int partitions = internalTable.partitions();
-
-        NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(
-                internalTable.zoneId(),
-                id -> new NaiveAsyncReadWriteLock());
-
-        CompletableFuture<Long> writeLockAcquisitionFuture = 
zoneLock.writeLock();
-
-        try {
-            return writeLockAcquisitionFuture.thenCompose(stamp -> {
-                CompletableFuture<?>[] stopReplicaAndDestroyFutures = new 
CompletableFuture<?>[partitions];
-
-                for (int partitionId = 0; partitionId < partitions; 
partitionId++) {
-                    CompletableFuture<Void> resourcesUnloadFuture;
-
-                    resourcesUnloadFuture = 
partitionReplicaLifecycleManager.unloadTableResourcesFromZoneReplica(
-                            new ZonePartitionId(internalTable.zoneId(), 
partitionId),
-                            internalTable.tableId()
-                    );
-
-                    var tablePartitionId = new 
TablePartitionId(internalTable.tableId(), partitionId);
-
-                    stopReplicaAndDestroyFutures[partitionId] = 
resourcesUnloadFuture
-                            .thenCompose(v -> 
stopAndDestroyTablePartition(tablePartitionId, table));
-                }
-
-                return allOf(stopReplicaAndDestroyFutures).whenComplete((res, 
th) -> {
-                    tablesPerZone.getOrDefault(internalTable.zoneId(), 
emptySet()).remove(table);
-                });
-            }).whenComplete((unused, t) -> 
writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite));
-        } catch (Throwable t) {
-            writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite);
-
-            throw t;
-        }
-    }
-
-    private CompletableFuture<Void> 
stopAndDestroyTablePartition(TablePartitionId tablePartitionId, long 
causalityToken) {
-        CompletableFuture<?> tokenFuture;
-
-        try {
-            tokenFuture = tablesVv.get(causalityToken);
-        } catch (OutdatedTokenException e) {
-            // Here we need only to ensure that the token has been seen.
-            // TODO https://issues.apache.org/jira/browse/IGNITE-25742
-            tokenFuture = nullCompletedFuture();
-        }
-
-        return tokenFuture
-                .thenCompose(ignore -> {
-                    TableViewInternal table = 
tables.get(tablePartitionId.tableId());
-                    assert table != null : tablePartitionId;
-
-                    return stopAndDestroyTablePartition(tablePartitionId, 
table);
-                });
-    }
-
-    private CompletableFuture<Void> stopAndDestroyTablePartition(
-            TablePartitionId tablePartitionId,
-            TableViewInternal table
-    ) {
-        return stopTablePartition(tablePartitionId, table)
-                .thenComposeAsync(v -> 
destroyPartitionStorages(tablePartitionId, table), ioExecutor);
-    }
-
-    /**
-     * Stops all resources associated with a given partition, like replicas 
and partition trackers.
-     *
-     * @param tablePartitionId Partition ID.
-     * @param table Table which this partition belongs to.
-     * @return Future that will be completed after all resources have been 
closed.
-     */
-    private CompletableFuture<Void> stopTablePartition(TablePartitionId 
tablePartitionId, TableViewInternal table) {
-        // In case of colocation there shouldn't be any table replica and thus 
it shouldn't be stopped.
-        minTimeCollectorService.removePartition(tablePartitionId);
-
-        unregisterPartitionMetrics(tablePartitionId, table.name());
-
-        return mvGc.removeStorage(tablePartitionId);
-    }
-
-    private void registerPartitionTableStatsMetrics(
-            TableViewInternal table,
-            int partitionId,
-            PartitionResources partitionResources
-    ) {
-        PartitionTableStatsMetricSource metricSource =
-                new PartitionTableStatsMetricSource(table.tableId(), 
partitionId, partitionResources.modificationCounter);
-
-        try {
-            // Only register this Metrics Source and do not enable it by 
default
-            // as it is intended for online troubleshooting purposes only (see 
IGNITE-27813).
-            metricManager.registerSource(metricSource);
-
-            TablePartitionId tablePartitionId = new 
TablePartitionId(table.tableId(), partitionId);
-
-            partModCounterMetricSources.put(tablePartitionId, metricSource);
-        } catch (Exception e) {
-            LOG.warn("Failed to register metrics source for table [name={}, 
partitionId={}].", e, table.name(), partitionId);
-        }
-
-        pendingWriteIntentsSuppliers.put(
-                new TablePartitionId(table.tableId(), partitionId),
-                partitionResources.storageUpdateHandler::getPendingRowCount
-        );
-    }
-
-    private void unregisterPartitionMetrics(TablePartitionId tablePartitionId, 
String tableName) {
-        PartitionTableStatsMetricSource metricSource = 
partModCounterMetricSources.remove(tablePartitionId);
-        pendingWriteIntentsSuppliers.remove(tablePartitionId);
-        if (metricSource != null) {
-            try {
-                metricManager.unregisterSource(metricSource);
-            } catch (Exception e) {
-                String message = "Failed to unregister metrics source for 
table [name={}, partitionId={}].";
-                LOG.warn(message, e, tableName, 
tablePartitionId.partitionId());
-            }
-        }
-    }
-
-    private long totalPendingWriteIntents() {
-        long sum = 0;
-
-        for (LongSupplier supplier : pendingWriteIntentsSuppliers.values()) {
-            sum += supplier.getAsLong();
-        }
-
-        return sum;
-    }
-
-    private CompletableFuture<Void> destroyPartitionStorages(
-            TablePartitionId tablePartitionId,
-            TableViewInternal table
-    ) {
-        InternalTable internalTable = table.internalTable();
-
-        int partitionId = tablePartitionId.partitionId();
-
-        List<CompletableFuture<?>> destroyFutures = new ArrayList<>();
-
-        try {
-            if (internalTable.storage().getMvPartition(partitionId) != null) {
-                
destroyFutures.add(internalTable.storage().destroyPartition(partitionId));
-            }
-        } catch (StorageDestroyedException ignored) {
-            // Ignore as the storage is already destroyed, no need to destroy 
it again.
-        } catch (StorageClosedException ignored) {
-            // The storage is closed, so the node is being stopped. We'll 
destroy the partition on node recovery.
-        }
-
-        // TODO: IGNITE-24926 - reduce set in localPartsByTableId after 
storages destruction.
-        return allOf(destroyFutures.toArray(new CompletableFuture[]{}));
-    }
-
     /**
      * Returns a cached table instance if it exists, {@code null} otherwise. 
Can return a table that is being stopped.
      *
@@ -1580,7 +998,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
      */
     @Override
     public @Nullable TableViewInternal cachedTable(int tableId) {
-        return tables.get(tableId);
+        return tableRegistry.tables().get(tableId);
     }
 
     /**
@@ -1590,7 +1008,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      */
     @TestOnly
     public @Nullable TableViewInternal cachedTable(String name) {
-        return findTableImplByName(tables.values(), name);
+        return findTableImplByName(tableRegistry.tables().values(), name);
     }
 
     private CatalogZoneDescriptor getZoneDescriptor(CatalogTableDescriptor 
tableDescriptor, int catalogVersion) {
@@ -1764,75 +1182,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      * @throws IgniteInternalException If failed to acquire a read lock for 
the zone or current thread was interrupted while waiting.
      */
     public Set<TableViewInternal> zoneTables(int zoneId) throws 
IgniteInternalException {
-        NaiveAsyncReadWriteLock zoneLock = 
tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock());
-
-        CompletableFuture<Long> readLockAcquisitionFuture = 
zoneLock.readLock();
-
-        try {
-            return readLockAcquisitionFuture.thenApply(stamp -> {
-                Set<TableViewInternal> res = 
Set.copyOf(zoneTablesRawSet(zoneId));
-
-                zoneLock.unlockRead(stamp);
-
-                return res;
-            }).get();
-        } catch (Throwable t) {
-            readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
-
-            if (t instanceof InterruptedException) {
-                Thread.currentThread().interrupt();
-            }
-
-            throw new IgniteInternalException(INTERNAL_ERR, "Failed to acquire 
a read lock for zone [zoneId=" + zoneId + ']', t);
-        }
-    }
-
-    /**
-     * Returns a set of tables that belong to the specified zone.
-     * Note that this method call should be properly synchronized using {@link 
#tablesPerZoneLocks}.
-     *
-     * @param zoneId Zone identifier.
-     * @return Set of tables.
-     */
-    private Set<TableViewInternal> zoneTablesRawSet(int zoneId) {
-        return tablesPerZone.getOrDefault(zoneId, Set.of());
-    }
-
-    /**
-     * Adds a table to the specified zone.
-     *
-     * @param zoneId Zone identifier.
-     * @param table Table to add.
-     * @throws IgniteInternalException If failed to acquire a write lock for 
the zone or current thread was interrupted while waiting.
-     */
-    private void addTableToZone(int zoneId, TableImpl table) throws 
IgniteInternalException {
-        NaiveAsyncReadWriteLock zoneLock = 
tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock());
-
-        CompletableFuture<Long> writeLockAcquisitionFuture = 
zoneLock.writeLock();
-
-        try {
-            writeLockAcquisitionFuture.thenAccept(stamp -> {
-                tablesPerZone.compute(zoneId, (id, tables) -> {
-                    if (tables == null) {
-                        tables = new HashSet<>();
-                    }
-
-                    tables.add(table);
-
-                    return tables;
-                });
-
-                zoneLock.unlockWrite(stamp);
-            }).get();
-        } catch (Throwable t) {
-            writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite);
-
-            if (t instanceof InterruptedException) {
-                Thread.currentThread().interrupt();
-            }
-
-            throw new IgniteInternalException(INTERNAL_ERR, "Failed to acquire 
a write lock for zone [zoneId=" + zoneId + ']', t);
-        }
+        return zoneCoordinator.zoneTables(zoneId);
     }
 
     private ReadWriteMetricSource 
createAndRegisterMetricsSource(MvTableStorage tableStorage, QualifiedName 
tableName) {
@@ -1901,11 +1251,4 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         }
     }
 
-    private static class TableClosedException extends IgniteInternalException {
-        private static final long serialVersionUID = 1L;
-
-        private TableClosedException(int tableId, @Nullable Throwable cause) {
-            super(INTERNAL_ERR, "Table is closed [tableId=" + tableId + "]", 
cause);
-        }
-    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableRegistry.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableRegistry.java
new file mode 100644
index 00000000000..5194a951678
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableRegistry.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.table.TableViewInternal;
+
+/**
+ * Holds shared mutable state for table tracking, shared between {@link 
TableManager} and {@link TableZoneCoordinator}.
+ *
+ * <ul>
+ *   <li>{@link #tables} — all registered tables by ID.</li>
+ *   <li>{@link #startedTables} — tables that are fully started (partition 
resources prepared).</li>
+ *   <li>{@link #localPartsByTableId} — local partition sets per table.</li>
+ * </ul>
+ */
+class TableRegistry {
+    /** All registered tables by ID. */
+    private final Map<Integer, TableViewInternal> tables = new 
ConcurrentHashMap<>();
+
+    /** Tables that are fully started (partition resources prepared). */
+    private final Map<Integer, TableViewInternal> startedTables = new 
ConcurrentHashMap<>();
+
+    /** Local partitions by table ID. */
+    private final Map<Integer, PartitionSet> localPartsByTableId = new 
ConcurrentHashMap<>();
+
+    Map<Integer, TableViewInternal> tables() {
+        return tables;
+    }
+
+    Map<Integer, TableViewInternal> startedTables() {
+        return startedTables;
+    }
+
+    Map<Integer, PartitionSet> localPartsByTableId() {
+        return localPartsByTableId;
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java
new file mode 100644
index 00000000000..9a2822d9be6
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java
@@ -0,0 +1,818 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED;
+import static 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED;
+import static 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED;
+import static 
org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexesToTable;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.causality.IncrementalVersionedValue;
+import org.apache.ignite.internal.causality.OutdatedTokenException;
+import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
+import org.apache.ignite.internal.metrics.MetricManager;
+import 
org.apache.ignite.internal.partition.replicator.LocalBeforeReplicaStartEventParameters;
+import 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEventParameters;
+import org.apache.ignite.internal.partition.replicator.NaiveAsyncReadWriteLock;
+import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
+import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager.TablePartitionReplicaProcessorFactory;
+import 
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageDestroyedException;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
+import 
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Coordinates table partition lifecycle within distribution zones.
+ *
+ * <p>Ensures that when zone replicas start/stop/destroy, the correct table 
partition resources
+ * are created, loaded, stopped, and destroyed. Maintains the zone-to-table 
mapping that enables this.
+ */
+class TableZoneCoordinator {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TableZoneCoordinator.class);
+
+    // TODO https://issues.apache.org/jira/browse/IGNITE-25347 Clean up 
entries on zone drop to avoid memory leak.
+    /** Mapping zone identifier to a collection of tables related to the zone. 
*/
+    private final Map<Integer, Set<TableViewInternal>> tablesPerZone = new 
ConcurrentHashMap<>();
+
+    /** Locks to synchronize access to the {@link #tablesPerZone}. */
+    private final Map<Integer, NaiveAsyncReadWriteLock> tablesPerZoneLocks = 
new ConcurrentHashMap<>();
+
+    private final Map<TablePartitionId, PartitionTableStatsMetricSource> 
partModCounterMetricSources = new ConcurrentHashMap<>();
+    private final Map<TablePartitionId, LongSupplier> 
pendingWriteIntentsSuppliers = new ConcurrentHashMap<>();
+
+    private final MvGc mvGc;
+    private final FullStateTransferIndexChooser fullStateTransferIndexChooser;
+
+    private final PartitionReplicaLifecycleManager 
partitionReplicaLifecycleManager;
+    private final TablePartitionResourcesFactory partitionResourcesFactory;
+    private final MinimumRequiredTimeCollectorService minTimeCollectorService;
+    private final MetricManager metricManager;
+    private final CatalogService catalogService;
+    private final LowWatermark lowWatermark;
+    private final ExecutorService ioExecutor;
+    private final IgniteSpinBusyLock busyLock;
+
+    private final TableRegistry tableRegistry;
+    private final IncrementalVersionedValue<Void> tablesVv;
+    private final IncrementalVersionedValue<Void> localPartitionsVv;
+    private final IncrementalVersionedValue<Void> assignmentsUpdatedVv;
+    private final CompletableFuture<Void> readyToProcessReplicaStarts;
+
+    private final EventListener<LocalBeforeReplicaStartEventParameters> 
onBeforeZoneReplicaStartedListener =
+            this::beforeZoneReplicaStarted;
+    private final EventListener<LocalPartitionReplicaEventParameters> 
onZoneReplicaStoppedListener =
+            this::onZoneReplicaStopped;
+    private final EventListener<LocalPartitionReplicaEventParameters> 
onZoneReplicaDestroyedListener =
+            this::onZoneReplicaDestroyed;
+
+    /**
+     * Constructor.
+     *
+     * @param mvGc MV garbage collector.
+     * @param fullStateTransferIndexChooser Index chooser for full state 
transfer.
+     * @param partitionReplicaLifecycleManager Partition replica lifecycle 
manager.
+     * @param partitionResourcesFactory Factory for creating table partition 
resources.
+     * @param minTimeCollectorService Minimum required time collector service.
+     * @param metricManager Metric manager.
+     * @param catalogService Catalog service.
+     * @param lowWatermark Low watermark.
+     * @param ioExecutor Executor for IO operations.
+     * @param busyLock Busy lock shared with TableManager.
+     * @param tableRegistry Shared table registry.
+     * @param tablesVv Versioned value for linearizing table changing events.
+     * @param localPartitionsVv Versioned value for linearizing table 
partitions changing events.
+     * @param assignmentsUpdatedVv Versioned value for tracking assignments 
updates.
+     * @param readyToProcessReplicaStarts Future that completes after table 
recovery.
+     */
+    TableZoneCoordinator(
+            MvGc mvGc,
+            FullStateTransferIndexChooser fullStateTransferIndexChooser,
+            PartitionReplicaLifecycleManager partitionReplicaLifecycleManager,
+            TablePartitionResourcesFactory partitionResourcesFactory,
+            MinimumRequiredTimeCollectorService minTimeCollectorService,
+            MetricManager metricManager,
+            CatalogService catalogService,
+            LowWatermark lowWatermark,
+            ExecutorService ioExecutor,
+            IgniteSpinBusyLock busyLock,
+            TableRegistry tableRegistry,
+            IncrementalVersionedValue<Void> tablesVv,
+            IncrementalVersionedValue<Void> localPartitionsVv,
+            IncrementalVersionedValue<Void> assignmentsUpdatedVv,
+            CompletableFuture<Void> readyToProcessReplicaStarts
+    ) {
+        this.mvGc = mvGc;
+        this.fullStateTransferIndexChooser = fullStateTransferIndexChooser;
+        this.partitionReplicaLifecycleManager = 
partitionReplicaLifecycleManager;
+        this.partitionResourcesFactory = partitionResourcesFactory;
+        this.minTimeCollectorService = minTimeCollectorService;
+        this.metricManager = metricManager;
+        this.catalogService = catalogService;
+        this.lowWatermark = lowWatermark;
+        this.ioExecutor = ioExecutor;
+        this.busyLock = busyLock;
+        this.tableRegistry = tableRegistry;
+        this.tablesVv = tablesVv;
+        this.localPartitionsVv = localPartitionsVv;
+        this.assignmentsUpdatedVv = assignmentsUpdatedVv;
+        this.readyToProcessReplicaStarts = readyToProcessReplicaStarts;
+
+        // Register event listeners in the constructor to avoid races with 
"partitionReplicaLifecycleManager"'s recovery.
+        // We rely on the "readyToProcessReplicaStarts" future to block event 
handling until table recovery is completed.
+        partitionReplicaLifecycleManager.listen(BEFORE_REPLICA_STARTED, 
onBeforeZoneReplicaStartedListener);
+        partitionReplicaLifecycleManager.listen(AFTER_REPLICA_STOPPED, 
onZoneReplicaStoppedListener);
+        partitionReplicaLifecycleManager.listen(AFTER_REPLICA_DESTROYED, 
onZoneReplicaDestroyedListener);
+    }
+
+    /**
+     * Starts the coordinator: starts MvGc and FullStateTransferIndexChooser.
+     */
+    void start() {
+        mvGc.start();
+        fullStateTransferIndexChooser.start();
+    }
+
+    /**
+     * Stops the coordinator: removes replica listeners, closes MvGc and 
FullStateTransferIndexChooser.
+     *
+     * @throws Exception If an error occurs during close.
+     */
+    void stop() throws Exception {
+        
partitionReplicaLifecycleManager.removeListener(AFTER_REPLICA_DESTROYED, 
onZoneReplicaDestroyedListener);
+        partitionReplicaLifecycleManager.removeListener(AFTER_REPLICA_STOPPED, 
onZoneReplicaStoppedListener);
+        
partitionReplicaLifecycleManager.removeListener(BEFORE_REPLICA_STARTED, 
onBeforeZoneReplicaStartedListener);
+
+        closeAllManually(mvGc, fullStateTransferIndexChooser);
+    }
+
+    /**
+     * Returns total pending write intents count across all partitions.
+     */
+    long totalPendingWriteIntents() {
+        long sum = 0;
+
+        for (LongSupplier supplier : pendingWriteIntentsSuppliers.values()) {
+            sum += supplier.getAsLong();
+        }
+
+        return sum;
+    }
+
+    /**
+     * Returns a copy of tables that belong to the specified zone.
+     *
+     * @param zoneId Zone identifier.
+     * @return Set of tables.
+     * @throws IgniteInternalException If failed to acquire a read lock for 
the zone or current thread was interrupted while waiting.
+     */
+    Set<TableViewInternal> zoneTables(int zoneId) throws 
IgniteInternalException {
+        NaiveAsyncReadWriteLock zoneLock = 
tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock());
+
+        CompletableFuture<Long> readLockAcquisitionFuture = 
zoneLock.readLock();
+
+        try {
+            return readLockAcquisitionFuture.thenApply(stamp -> {
+                Set<TableViewInternal> res = 
Set.copyOf(zoneTablesRawSet(zoneId));
+
+                zoneLock.unlockRead(stamp);
+
+                return res;
+            }).get();
+        } catch (Throwable t) {
+            readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
+
+            if (t instanceof InterruptedException) {
+                Thread.currentThread().interrupt();
+            }
+
+            throw new IgniteInternalException(INTERNAL_ERR, "Failed to acquire 
a read lock for zone [zoneId=" + zoneId + ']', t);
+        }
+    }
+
+    /**
+     * Adds a table to the specified zone.
+     *
+     * @param zoneId Zone identifier.
+     * @param table Table to add.
+     * @throws IgniteInternalException If failed to acquire a write lock for 
the zone or current thread was interrupted while waiting.
+     */
+    void addTableToZone(int zoneId, TableImpl table) throws 
IgniteInternalException {
+        NaiveAsyncReadWriteLock zoneLock = 
tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock());
+
+        CompletableFuture<Long> writeLockAcquisitionFuture = 
zoneLock.writeLock();
+
+        try {
+            writeLockAcquisitionFuture.thenAccept(stamp -> {
+                tablesPerZone.compute(zoneId, (id, zoneTbls) -> {
+                    if (zoneTbls == null) {
+                        zoneTbls = new HashSet<>();
+                    }
+
+                    zoneTbls.add(table);
+
+                    return zoneTbls;
+                });
+
+                zoneLock.unlockWrite(stamp);
+            }).get();
+        } catch (Throwable t) {
+            writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite);
+
+            if (t instanceof InterruptedException) {
+                Thread.currentThread().interrupt();
+            }
+
+            throw new IgniteInternalException(INTERNAL_ERR, "Failed to acquire 
a write lock for zone [zoneId=" + zoneId + ']', t);
+        }
+    }
+
+    /**
+     * Loads a newly created table into the zone by creating partition 
storages and preparing partition resources for local zone replicas.
+     *
+     * @param readLockAcquisitionFuture Future for read lock on the zone in 
PartitionReplicaLifecycleManager.
+     * @param causalityToken Causality token.
+     * @param zoneDescriptor Zone descriptor.
+     * @param tableId Table ID.
+     * @return Completion future.
+     */
+    CompletableFuture<Void> loadTableToZone(
+            CompletableFuture<Long> readLockAcquisitionFuture,
+            long causalityToken,
+            CatalogZoneDescriptor zoneDescriptor,
+            int tableId
+    ) {
+        CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
+        CompletableFuture<Void> readLockFutureEx = 
allOf(readLockAcquisitionFuture, tablesByIdFuture);
+
+        // NB: all vv.update() calls must be made from the synchronous part of 
the method (not in thenCompose()/etc!).
+        CompletableFuture<?> localPartsUpdateFuture = 
localPartitionsVv.update(causalityToken,
+                (ignore, throwable) -> inBusyLock(busyLock, () -> 
readLockFutureEx.thenComposeAsync(unused -> {
+                    PartitionSet parts = new BitSetPartitionSet();
+
+                    for (int i = 0; i < zoneDescriptor.partitions(); i++) {
+                        if 
(partitionReplicaLifecycleManager.hasLocalPartition(new 
ZonePartitionId(zoneDescriptor.id(), i))) {
+                            parts.set(i);
+                        }
+                    }
+
+                    var table = (TableImpl) 
tableRegistry.tables().get(tableId);
+
+                    return createPartitionStoragesIfAbsent(table, parts)
+                            .thenRun(() -> 
tableRegistry.localPartsByTableId().put(tableId, parts));
+                }, ioExecutor))
+                // If the table is already closed, it's not a problem 
(probably the node is stopping).
+                .exceptionally(ignoreTableClosedException())
+        );
+
+        CompletableFuture<?> createPartsFut = 
assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
+            if (e != null) {
+                return failedFuture(e);
+            }
+
+            return localPartsUpdateFuture.thenRunAsync(() -> 
inBusyLock(busyLock, () -> {
+                var table = (TableImpl) tableRegistry.tables().get(tableId);
+
+                for (int i = 0; i < zoneDescriptor.partitions(); i++) {
+                    var zonePartitionId = new 
ZonePartitionId(zoneDescriptor.id(), i);
+
+                    if 
(partitionReplicaLifecycleManager.hasLocalPartition(zonePartitionId)) {
+                        preparePartitionResourcesAndLoadToZoneReplicaBusy(
+                                table,
+                                zonePartitionId,
+                                
partitionReplicaLifecycleManager.zonePartitionResources(zonePartitionId),
+                                false
+                        );
+                    }
+                }
+            }), ioExecutor);
+        });
+
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible 
performance degradation.
+        return createPartsFut.thenAccept(ignore -> {
+            var table = (TableImpl) tableRegistry.tables().get(tableId);
+
+            tableRegistry.startedTables().put(tableId, table);
+
+            addTableToZone(zoneDescriptor.id(), table);
+        });
+    }
+
+    /**
+     * Stops and destroys table processors for all partitions of a table.
+     *
+     * @param table Table to stop and destroy processors for.
+     * @return Completion future.
+     */
+    CompletableFuture<Void> stopAndDestroyTableProcessors(TableViewInternal 
table) {
+        InternalTable internalTable = table.internalTable();
+
+        int partitions = internalTable.partitions();
+
+        NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(
+                internalTable.zoneId(),
+                id -> new NaiveAsyncReadWriteLock());
+
+        CompletableFuture<Long> writeLockAcquisitionFuture = 
zoneLock.writeLock();
+
+        try {
+            return writeLockAcquisitionFuture.thenCompose(stamp -> {
+                CompletableFuture<?>[] stopReplicaAndDestroyFutures = new 
CompletableFuture<?>[partitions];
+
+                for (int partitionId = 0; partitionId < partitions; 
partitionId++) {
+                    CompletableFuture<Void> resourcesUnloadFuture;
+
+                    resourcesUnloadFuture = 
partitionReplicaLifecycleManager.unloadTableResourcesFromZoneReplica(
+                            new ZonePartitionId(internalTable.zoneId(), 
partitionId),
+                            internalTable.tableId()
+                    );
+
+                    var tablePartitionId = new 
TablePartitionId(internalTable.tableId(), partitionId);
+
+                    stopReplicaAndDestroyFutures[partitionId] = 
resourcesUnloadFuture
+                            .thenCompose(v -> 
stopAndDestroyTablePartition(tablePartitionId, table));
+                }
+
+                return allOf(stopReplicaAndDestroyFutures).whenComplete((res, 
th) -> {
+                    tablesPerZone.getOrDefault(internalTable.zoneId(), 
emptySet()).remove(table);
+                });
+            }).whenComplete((unused, t) -> 
writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite));
+        } catch (Throwable t) {
+            writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite);
+
+            throw t;
+        }
+    }
+
+    private CompletableFuture<Boolean> 
beforeZoneReplicaStarted(LocalBeforeReplicaStartEventParameters parameters) {
+        return inBusyLockAsync(busyLock, () -> readyToProcessReplicaStarts
+                .thenCompose(v -> beforeZoneReplicaStartedImpl(parameters))
+                .thenApply(unused -> false)
+        );
+    }
+
+    private CompletableFuture<Void> 
beforeZoneReplicaStartedImpl(LocalBeforeReplicaStartEventParameters parameters) 
{
+        return inBusyLockAsync(busyLock, () -> {
+            ZonePartitionId zonePartitionId = parameters.zonePartitionId();
+
+            NaiveAsyncReadWriteLock zoneLock = 
tablesPerZoneLocks.computeIfAbsent(
+                    zonePartitionId.zoneId(),
+                    id -> new NaiveAsyncReadWriteLock());
+
+            CompletableFuture<Long> readLockAcquisitionFuture = 
zoneLock.readLock();
+
+            try {
+                return readLockAcquisitionFuture.thenCompose(stamp -> {
+                    Set<TableViewInternal> zoneTables = 
zoneTablesRawSet(zonePartitionId.zoneId());
+
+                    return 
createPartitionsAndLoadResourcesToZoneReplica(zonePartitionId, zoneTables, 
parameters);
+                }).whenComplete((unused, t) -> 
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead));
+            } catch (Throwable t) {
+                readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
+
+                return failedFuture(t);
+            }
+        });
+    }
+
+    private CompletableFuture<Void> 
createPartitionsAndLoadResourcesToZoneReplica(
+            ZonePartitionId zonePartitionId,
+            Set<TableViewInternal> zoneTables,
+            LocalBeforeReplicaStartEventParameters event
+    ) {
+        int partitionIndex = zonePartitionId.partitionId();
+
+        PartitionSet singlePartitionIdSet = PartitionSet.of(partitionIndex);
+
+        List<CompletableFuture<?>> storageCreationFutures = zoneTables.stream()
+                .map(tbl -> inBusyLockAsync(busyLock, () -> 
createPartitionStoragesIfAbsent(tbl, singlePartitionIdSet)
+                        // If the table is already closed, it's not a problem 
(probably the node is stopping).
+                        .exceptionally(ignoreTableClosedException())))
+                .collect(toList());
+
+        return CompletableFutures.allOf(storageCreationFutures)
+                .thenRunAsync(() -> 
scheduleMvPartitionsCleanupIfNeeded(zoneTables, partitionIndex, event), 
ioExecutor)
+                // If a table is already closed, it's not a problem (probably 
the node is stopping).
+                .exceptionally(ignoreTableClosedException())
+                .thenCompose(unused -> {
+                    CompletableFuture<?>[] futures = zoneTables.stream()
+                            .map(tbl -> inBusyLockAsync(busyLock, () -> {
+                                return runAsync(() -> inBusyLock(busyLock, () 
-> {
+                                    
tableRegistry.localPartsByTableId().compute(
+                                            tbl.tableId(),
+                                            (tableId, oldPartitionSet) -> 
extendPartitionSet(oldPartitionSet, partitionIndex)
+                                    );
+
+                                    lowWatermark.getLowWatermarkSafe(lwm ->
+                                            registerIndexesToTable(
+                                                    tbl,
+                                                    catalogService,
+                                                    singlePartitionIdSet,
+                                                    tbl.schemaView(),
+                                                    lwm
+                                            )
+                                    );
+
+                                    
preparePartitionResourcesAndLoadToZoneReplicaBusy(
+                                            tbl,
+                                            zonePartitionId,
+                                            event.resources(),
+                                            event.onRecovery()
+                                    );
+                                }), ioExecutor)
+                                // If the table is already closed, it's not a 
problem (probably the node is stopping).
+                                .exceptionally(ignoreTableClosedException());
+                            }))
+                            .toArray(CompletableFuture[]::new);
+
+                    return allOf(futures);
+                });
+    }
+
+    private static void scheduleMvPartitionsCleanupIfNeeded(
+            Set<TableViewInternal> zoneTables,
+            int partitionIndex,
+            LocalBeforeReplicaStartEventParameters event
+    ) {
+        boolean anyMvPartitionStorageIsInRebalanceState = zoneTables.stream()
+                .map(table -> 
table.internalTable().storage().getMvPartition(partitionIndex))
+                .filter(Objects::nonNull)
+                .anyMatch(partitionStorage -> 
partitionStorage.lastAppliedIndex() == 
MvPartitionStorage.REBALANCE_IN_PROGRESS);
+
+        if (anyMvPartitionStorageIsInRebalanceState) {
+            event.registerStorageInRebalanceState();
+        }
+
+        // Adding the cleanup action even if no MV partition storage is in 
rebalance state as it might be that the TX state storage is.
+        event.addCleanupAction(() -> {
+            CompletableFuture<?>[] clearFutures = zoneTables.stream()
+                    .map(table -> 
table.internalTable().storage().clearPartition(partitionIndex))
+                    .toArray(CompletableFuture[]::new);
+            return allOf(clearFutures);
+        });
+    }
+
+    private CompletableFuture<Boolean> 
onZoneReplicaStopped(LocalPartitionReplicaEventParameters parameters) {
+        ZonePartitionId zonePartitionId = parameters.zonePartitionId();
+
+        NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(
+                zonePartitionId.zoneId(),
+                id -> new NaiveAsyncReadWriteLock());
+
+        CompletableFuture<Long> readLockAcquisitionFuture = 
zoneLock.readLock();
+
+        try {
+            return readLockAcquisitionFuture.thenCompose(stamp -> 
inBusyLockAsync(busyLock, () -> {
+                CompletableFuture<?>[] futures = 
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
+                        .map(this::stopTablePartitions)
+                        .toArray(CompletableFuture[]::new);
+
+                return allOf(futures);
+            })).whenComplete((v, t) -> 
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead)).thenApply(v -> 
false);
+        } catch (Throwable t) {
+            readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
+
+            return failedFuture(t);
+        }
+    }
+
+    private CompletableFuture<Boolean> 
onZoneReplicaDestroyed(LocalPartitionReplicaEventParameters parameters) {
+        ZonePartitionId zonePartitionId = parameters.zonePartitionId();
+
+        NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(
+                zonePartitionId.zoneId(),
+                id -> new NaiveAsyncReadWriteLock());
+
+        CompletableFuture<Long> readLockAcquisitionFuture = 
zoneLock.readLock();
+
+        try {
+            return readLockAcquisitionFuture.thenCompose(stamp -> {
+                return inBusyLockAsync(busyLock, () -> {
+                    CompletableFuture<?>[] futures = 
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
+                            .map(table -> supplyAsync(
+                                    () -> inBusyLockAsync(
+                                            busyLock,
+                                            () -> stopAndDestroyTablePartition(
+                                                    new 
TablePartitionId(table.tableId(), zonePartitionId.partitionId()),
+                                                    parameters.causalityToken()
+                                            )
+                                    ),
+                                    ioExecutor).thenCompose(identity()))
+                            .toArray(CompletableFuture[]::new);
+
+                    return allOf(futures);
+                });
+            }).whenComplete((v, t) -> 
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead)).thenApply(unused -> 
false);
+        } catch (Throwable t) {
+            readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
+
+            return failedFuture(t);
+        }
+    }
+
+    private void preparePartitionResourcesAndLoadToZoneReplicaBusy(
+            TableViewInternal table,
+            ZonePartitionId zonePartitionId,
+            ZonePartitionResources resources,
+            boolean onNodeRecovery
+    ) {
+        int partId = zonePartitionId.partitionId();
+
+        int tableId = table.tableId();
+
+        var tablePartitionId = new TablePartitionId(tableId, partId);
+
+        MvPartitionStorage mvPartitionStorage;
+        try {
+            mvPartitionStorage = getMvPartitionStorage(table, partId);
+        } catch (TableClosedException e) {
+            // The node is probably stopping while we start the table, let's 
just skip it.
+            return;
+        }
+
+        PartitionDataStorage partitionDataStorage = 
partitionResourcesFactory.createPartitionDataStorage(
+                new PartitionKey(zonePartitionId.zoneId(), partId),
+                tableId,
+                mvPartitionStorage
+        );
+
+        PartitionResources partitionResources = 
partitionResourcesFactory.createPartitionResources(
+                partId,
+                partitionDataStorage,
+                table,
+                resources.safeTimeTracker()
+        );
+
+        partitionResources.storageUpdateHandler.start(onNodeRecovery);
+
+        registerPartitionTableStatsMetrics(table, partId, partitionResources);
+
+        mvGc.addStorage(tablePartitionId, partitionResources.gcUpdateHandler);
+
+        minTimeCollectorService.addPartition(tablePartitionId);
+
+        TablePartitionReplicaProcessorFactory createListener = (raftClient, 
transactionStateResolver) ->
+                partitionResourcesFactory.createReplicaListener(
+                        zonePartitionId,
+                        table,
+                        resources.safeTimeTracker(),
+                        mvPartitionStorage,
+                        partitionResources,
+                        raftClient,
+                        transactionStateResolver
+                );
+
+        TablePartitionProcessor tablePartitionProcessor = 
partitionResourcesFactory.createTablePartitionProcessor(
+                zonePartitionId, table, partitionDataStorage, 
partitionResources);
+
+        PartitionMvStorageAccess partitionStorageAccess = 
partitionResourcesFactory.createPartitionMvStorageAccess(
+                partId, table, partitionResources);
+
+        partitionReplicaLifecycleManager.loadTableListenerToZoneReplica(
+                zonePartitionId,
+                tableId,
+                createListener,
+                tablePartitionProcessor,
+                partitionStorageAccess,
+                onNodeRecovery
+        );
+    }
+
+    private CompletableFuture<Void> stopTablePartitions(TableViewInternal 
table) {
+        return supplyAsync(() -> {
+            InternalTable internalTable = table.internalTable();
+
+            var stopReplicaFutures = new 
CompletableFuture<?>[internalTable.partitions()];
+
+            for (int p = 0; p < internalTable.partitions(); p++) {
+                TablePartitionId replicationGroupId = new 
TablePartitionId(table.tableId(), p);
+
+                stopReplicaFutures[p] = stopTablePartition(replicationGroupId, 
table);
+            }
+
+            return allOf(stopReplicaFutures);
+        }, ioExecutor).thenCompose(identity());
+    }
+
+    private CompletableFuture<Void> 
stopAndDestroyTablePartition(TablePartitionId tablePartitionId, long 
causalityToken) {
+        CompletableFuture<?> tokenFuture;
+
+        try {
+            tokenFuture = tablesVv.get(causalityToken);
+        } catch (OutdatedTokenException e) {
+            // Here we need only to ensure that the token has been seen.
+            // TODO https://issues.apache.org/jira/browse/IGNITE-25742
+            tokenFuture = nullCompletedFuture();
+        }
+
+        return tokenFuture
+                .thenCompose(ignore -> {
+                    TableViewInternal table = 
tableRegistry.tables().get(tablePartitionId.tableId());
+                    assert table != null : tablePartitionId;
+
+                    return stopAndDestroyTablePartition(tablePartitionId, 
table);
+                });
+    }
+
+    private CompletableFuture<Void> stopAndDestroyTablePartition(
+            TablePartitionId tablePartitionId,
+            TableViewInternal table
+    ) {
+        return stopTablePartition(tablePartitionId, table)
+                .thenComposeAsync(v -> 
destroyPartitionStorages(tablePartitionId, table), ioExecutor);
+    }
+
+    private CompletableFuture<Void> stopTablePartition(TablePartitionId 
tablePartitionId, TableViewInternal table) {
+        // In case of colocation there shouldn't be any table replica and thus 
it shouldn't be stopped.
+        minTimeCollectorService.removePartition(tablePartitionId);
+
+        unregisterPartitionMetrics(tablePartitionId, table.name());
+
+        return mvGc.removeStorage(tablePartitionId);
+    }
+
+    private void registerPartitionTableStatsMetrics(
+            TableViewInternal table,
+            int partitionId,
+            PartitionResources partitionResources
+    ) {
+        var tablePartitionId = new TablePartitionId(table.tableId(), 
partitionId);
+
+        PartitionTableStatsMetricSource metricSource =
+                new PartitionTableStatsMetricSource(table.tableId(), 
partitionId, partitionResources.modificationCounter);
+
+        try {
+            // Only register this Metrics Source and do not enable it by 
default
+            // as it is intended for online troubleshooting purposes only (see 
IGNITE-27813).
+            metricManager.registerSource(metricSource);
+
+            partModCounterMetricSources.put(tablePartitionId, metricSource);
+        } catch (Exception e) {
+            LOG.warn("Failed to register metrics source for table [name={}, 
partitionId={}].", e, table.name(), partitionId);
+        }
+
+        pendingWriteIntentsSuppliers.put(
+                tablePartitionId,
+                partitionResources.storageUpdateHandler::getPendingRowCount
+        );
+    }
+
+    private void unregisterPartitionMetrics(TablePartitionId tablePartitionId, 
String tableName) {
+        PartitionTableStatsMetricSource metricSource = 
partModCounterMetricSources.remove(tablePartitionId);
+        pendingWriteIntentsSuppliers.remove(tablePartitionId);
+        if (metricSource != null) {
+            try {
+                metricManager.unregisterSource(metricSource);
+            } catch (Exception e) {
+                String message = "Failed to unregister metrics source for 
table [name={}, partitionId={}].";
+                LOG.warn(message, e, tableName, 
tablePartitionId.partitionId());
+            }
+        }
+    }
+
+    private static CompletableFuture<Void> destroyPartitionStorages(
+            TablePartitionId tablePartitionId,
+            TableViewInternal table
+    ) {
+        InternalTable internalTable = table.internalTable();
+
+        int partitionId = tablePartitionId.partitionId();
+
+        List<CompletableFuture<?>> destroyFutures = new ArrayList<>();
+
+        try {
+            if (internalTable.storage().getMvPartition(partitionId) != null) {
+                
destroyFutures.add(internalTable.storage().destroyPartition(partitionId));
+            }
+        } catch (StorageDestroyedException ignored) {
+            // Ignore as the storage is already destroyed, no need to destroy 
it again.
+        } catch (StorageClosedException ignored) {
+            // The storage is closed, so the node is being stopped. We'll 
destroy the partition on node recovery.
+        }
+
+        // TODO: IGNITE-24926 - reduce set in localPartsByTableId after 
storages destruction.
+        return allOf(destroyFutures.toArray(new CompletableFuture[]{}));
+    }
+
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-19739 Create 
storages only once.
+    private static CompletableFuture<Void> 
createPartitionStoragesIfAbsent(TableViewInternal table, PartitionSet 
partitions) {
+        InternalTable internalTable = table.internalTable();
+
+        List<CompletableFuture<MvPartitionStorage>> storageFuts = 
partitions.stream().mapToObj(partitionId -> {
+            MvPartitionStorage mvPartition;
+            try {
+                mvPartition = 
internalTable.storage().getMvPartition(partitionId);
+            } catch (StorageClosedException e) {
+                return CompletableFuture.<MvPartitionStorage>failedFuture(new 
TableClosedException(table.tableId(), e));
+            }
+
+            return mvPartition != null
+                    ? CompletableFuture.completedFuture(mvPartition)
+                    : internalTable.storage().createMvPartition(partitionId);
+        }).collect(toList());
+
+        return CompletableFutures.allOf(storageFuts);
+    }
+
+    private static MvPartitionStorage getMvPartitionStorage(TableViewInternal 
table, int partitionId) {
+        InternalTable internalTable = table.internalTable();
+
+        MvPartitionStorage mvPartition;
+        try {
+            mvPartition = internalTable.storage().getMvPartition(partitionId);
+        } catch (StorageClosedException e) {
+            throw new TableClosedException(table.tableId(), e);
+        }
+
+        assert mvPartition != null : "tableId=" + table.tableId() + ", 
partitionId=" + partitionId;
+
+        return mvPartition;
+    }
+
+    private Set<TableViewInternal> zoneTablesRawSet(int zoneId) {
+        return tablesPerZone.getOrDefault(zoneId, Set.of());
+    }
+
+    private static PartitionSet extendPartitionSet(@Nullable PartitionSet 
oldPartitionSet, int partitionId) {
+        PartitionSet newPartitionSet = 
Objects.requireNonNullElseGet(oldPartitionSet, BitSetPartitionSet::new);
+        newPartitionSet.set(partitionId);
+        return newPartitionSet;
+    }
+
+    private static <T> Function<Throwable, T> ignoreTableClosedException() {
+        return ex -> {
+            if (hasCause(ex, TableClosedException.class)) {
+                return null;
+            }
+            throw sneakyThrow(ex);
+        };
+    }
+
+    private static class TableClosedException extends IgniteInternalException {
+        private static final long serialVersionUID = 1L;
+
+        TableClosedException(int tableId, @Nullable Throwable cause) {
+            super(INTERNAL_ERR, "Table is closed [tableId=" + tableId + "]", 
cause);
+        }
+    }
+}

Reply via email to