This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 67cc4e20bb7 IGNITE-28007 Split TablePartitionResourcesFactory out of 
TableManager (#7701)
67cc4e20bb7 is described below

commit 67cc4e20bb7f11336bb63a2289d869e266237f68
Author: Viacheslav Blinov <[email protected]>
AuthorDate: Tue Mar 17 12:49:10 2026 +0300

    IGNITE-28007 Split TablePartitionResourcesFactory out of TableManager 
(#7701)
---
 ...UpdateHandlers.java => PartitionResources.java} |  12 +-
 .../internal/table/distributed/TableManager.java   | 290 ++++++-------------
 .../TablePartitionResourcesFactory.java            | 307 +++++++++++++++++++++
 .../distributed/TableManagerRecoveryTest.java      |   3 +-
 .../table/distributed/TableManagerTest.java        |   2 +
 .../ignite/internal/table/TableTestUtils.java      |   2 +-
 6 files changed, 412 insertions(+), 204 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionUpdateHandlers.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionResources.java
similarity index 79%
rename from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionUpdateHandlers.java
rename to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionResources.java
index 3967b76724b..ef38773b7a3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionUpdateHandlers.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionResources.java
@@ -21,22 +21,26 @@ import 
org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 
 /**
- * Partition update handler container.
+ * Partition resource container: handlers and modification counter created 
during partition setup.
  */
-class PartitionUpdateHandlers {
+class PartitionResources {
     final StorageUpdateHandler storageUpdateHandler;
 
     final IndexUpdateHandler indexUpdateHandler;
 
     final GcUpdateHandler gcUpdateHandler;
 
-    PartitionUpdateHandlers(
+    final PartitionModificationCounter modificationCounter;
+
+    PartitionResources(
             StorageUpdateHandler storageUpdateHandler,
             IndexUpdateHandler indexUpdateHandler,
-            GcUpdateHandler gcUpdateHandler
+            GcUpdateHandler gcUpdateHandler,
+            PartitionModificationCounter modificationCounter
     ) {
         this.storageUpdateHandler = storageUpdateHandler;
         this.indexUpdateHandler = indexUpdateHandler;
         this.gcUpdateHandler = gcUpdateHandler;
+        this.modificationCounter = modificationCounter;
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 38adc3c3856..efa96328132 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -109,7 +109,6 @@ import 
org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metrics.MetricManager;
-import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.partition.replicator.LocalBeforeReplicaStartEventParameters;
@@ -120,13 +119,12 @@ import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycle
 import 
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import 
org.apache.ignite.internal.partition.replicator.schema.ExecutorInclinedSchemaSyncService;
 import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import 
org.apache.ignite.internal.placementdriver.wrappers.ExecutorInclinedPlacementDriver;
-import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
-import org.apache.ignite.internal.raft.service.RaftCommandRunner;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -148,17 +146,11 @@ import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.TableViewInternal;
-import 
org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory.SizeSupplier;
-import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
-import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
 import 
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
-import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl;
-import 
org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
-import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
@@ -170,14 +162,11 @@ import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
-import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
 import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.LongPriorityQueue;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.table.QualifiedName;
@@ -266,8 +255,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
     private final ClockService clockService;
 
-    private final OutgoingSnapshotsManager outgoingSnapshotsManager;
-
     private final SchemaSyncService executorInclinedSchemaSyncService;
 
     private final CatalogService catalogService;
@@ -288,28 +275,17 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
     private final SchemaVersions schemaVersions;
 
-    private final ValidationSchemasSource validationSchemasSource;
-
     private final PartitionReplicatorNodeRecovery 
partitionReplicatorNodeRecovery;
 
     /** Ends at the {@link IgniteComponent#stopAsync(ComponentContext)} with 
an {@link NodeStoppingException}. */
     private final CompletableFuture<Void> stopManagerFuture = new 
CompletableFuture<>();
 
-    private final ReplicationConfiguration replicationConfiguration;
-
-    /**
-     * Executes partition operations (that might cause I/O and/or be blocked 
on locks).
-     */
-    private final Executor partitionOperationsExecutor;
-
     /** Marshallers provider. */
     private final ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
 
     /** Index chooser for full state transfer. */
     private final FullStateTransferIndexChooser fullStateTransferIndexChooser;
 
-    private final RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry;
-
     private final TransactionInflights transactionInflights;
 
     private final String nodeName;
@@ -319,8 +295,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     @Nullable
     private ScheduledExecutorService streamerFlushExecutor;
 
-    private final IndexMetaStorage indexMetaStorage;
-
     private final MinimumRequiredTimeCollectorService minTimeCollectorService;
 
     @Nullable
@@ -349,10 +323,11 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
     private final MetricManager metricManager;
 
-    private final PartitionModificationCounterFactory 
partitionModificationCounterFactory;
     private final Map<TablePartitionId, PartitionTableStatsMetricSource> 
partModCounterMetricSources = new ConcurrentHashMap<>();
     private final Map<TablePartitionId, LongSupplier> 
pendingWriteIntentsSuppliers = new ConcurrentHashMap<>();
 
+    private final TablePartitionResourcesFactory partitionResourcesFactory;
+
     /**
      * Creates a new table manager.
      *
@@ -421,25 +396,18 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         this.dataStorageMgr = dataStorageMgr;
         this.metaStorageMgr = metaStorageMgr;
         this.schemaManager = schemaManager;
-        this.validationSchemasSource = validationSchemasSource;
         this.ioExecutor = ioExecutor;
-        this.partitionOperationsExecutor = partitionOperationsExecutor;
         this.clockService = clockService;
-        this.outgoingSnapshotsManager = outgoingSnapshotsManager;
         this.catalogService = catalogService;
         this.failureProcessor = failureProcessor;
         this.observableTimestampTracker = observableTimestampTracker;
         this.sql = sql;
-        this.replicationConfiguration = replicationConfiguration;
-        this.remotelyTriggeredResourceRegistry = 
remotelyTriggeredResourceRegistry;
         this.lowWatermark = lowWatermark;
         this.transactionInflights = transactionInflights;
         this.nodeName = nodeName;
-        this.indexMetaStorage = indexMetaStorage;
         this.partitionReplicaLifecycleManager = 
partitionReplicaLifecycleManager;
         this.minTimeCollectorService = minTimeCollectorService;
         this.metricManager = metricManager;
-        this.partitionModificationCounterFactory = 
partitionModificationCounterFactory;
 
         this.executorInclinedSchemaSyncService = new 
ExecutorInclinedSchemaSyncService(schemaSyncService, 
partitionOperationsExecutor);
         this.executorInclinedPlacementDriver = new 
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -465,6 +433,30 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
         fullStateTransferIndexChooser = new 
FullStateTransferIndexChooser(catalogService, lowWatermark, indexMetaStorage);
 
+        partitionResourcesFactory = new TablePartitionResourcesFactory(
+                txManager,
+                lockMgr,
+                scanRequestExecutor,
+                clockService,
+                catalogService,
+                partitionModificationCounterFactory,
+                outgoingSnapshotsManager,
+                lowWatermark,
+                validationSchemasSource,
+                this.executorInclinedSchemaSyncService,
+                this.executorInclinedPlacementDriver,
+                topologyService,
+                remotelyTriggeredResourceRegistry,
+                failureProcessor,
+                schemaManager,
+                replicationConfiguration,
+                partitionOperationsExecutor,
+                indexMetaStorage,
+                minTimeCollectorService,
+                mvGc,
+                fullStateTransferIndexChooser
+        );
+
         rebalanceRetryDelayConfiguration = new 
SystemDistributedConfigurationPropertyHolder<>(
                 systemDistributedConfiguration,
                 (v, r) -> {},
@@ -860,65 +852,49 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             return;
         }
 
-        PartitionDataStorage partitionDataStorage = partitionDataStorage(
+        PartitionDataStorage partitionDataStorage = 
partitionResourcesFactory.createPartitionDataStorage(
                 new PartitionKey(zonePartitionId.zoneId(), partId),
                 tableId,
                 mvPartitionStorage
         );
 
-        PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
+        PartitionResources partitionResources = 
partitionResourcesFactory.createPartitionResources(
                 partId,
                 partitionDataStorage,
                 table,
-                resources.safeTimeTracker(),
-                replicationConfiguration,
-                onNodeRecovery
+                resources.safeTimeTracker()
         );
 
-        mvGc.addStorage(tablePartitionId, 
partitionUpdateHandlers.gcUpdateHandler);
+        partitionResources.storageUpdateHandler.start(onNodeRecovery);
+
+        registerPartitionTableStatsMetrics(table, partId, partitionResources);
+
+        mvGc.addStorage(tablePartitionId, partitionResources.gcUpdateHandler);
 
         minTimeCollectorService.addPartition(new TablePartitionId(tableId, 
partId));
 
-        TablePartitionReplicaProcessorFactory createListener = (raftClient, 
transactionStateResolver) -> createReplicaListener(
-                zonePartitionId,
-                table,
-                resources.safeTimeTracker(),
-                mvPartitionStorage,
-                partitionUpdateHandlers,
-                raftClient,
-                transactionStateResolver
-        );
+        TablePartitionReplicaProcessorFactory createListener = (raftClient, 
transactionStateResolver) ->
+                partitionResourcesFactory.createReplicaListener(
+                        zonePartitionId,
+                        table,
+                        resources.safeTimeTracker(),
+                        mvPartitionStorage,
+                        partitionResources,
+                        raftClient,
+                        transactionStateResolver
+                );
 
-        var tablePartitionRaftListener = new TablePartitionProcessor(
-                txManager,
-                partitionDataStorage,
-                partitionUpdateHandlers.storageUpdateHandler,
-                catalogService,
-                table.schemaView(),
-                indexMetaStorage,
-                topologyService.localMember().id(),
-                minTimeCollectorService,
-                executorInclinedPlacementDriver,
-                clockService,
-                zonePartitionId
-        );
+        TablePartitionProcessor tablePartitionProcessor = 
partitionResourcesFactory.createTablePartitionProcessor(
+                zonePartitionId, table, partitionDataStorage, 
partitionResources);
 
-        var partitionStorageAccess = new PartitionMvStorageAccessImpl(
-                partId,
-                table.internalTable().storage(),
-                mvGc,
-                partitionUpdateHandlers.indexUpdateHandler,
-                partitionUpdateHandlers.gcUpdateHandler,
-                fullStateTransferIndexChooser,
-                schemaManager.schemaRegistry(tableId),
-                lowWatermark
-        );
+        PartitionMvStorageAccess partitionStorageAccess = 
partitionResourcesFactory.createPartitionMvStorageAccess(
+                partId, table, partitionResources);
 
         partitionReplicaLifecycleManager.loadTableListenerToZoneReplica(
                 zonePartitionId,
                 tableId,
                 createListener,
-                tablePartitionRaftListener,
+                tablePartitionProcessor,
                 partitionStorageAccess,
                 onNodeRecovery
         );
@@ -998,56 +974,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         );
     }
 
-    private PartitionReplicaListener createReplicaListener(
-            ZonePartitionId replicationGroupId,
-            TableViewInternal table,
-            PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTimeTracker,
-            MvPartitionStorage mvPartitionStorage,
-            PartitionUpdateHandlers partitionUpdateHandlers,
-            RaftCommandRunner raftClient,
-            TransactionStateResolver transactionStateResolver
-    ) {
-        int partitionIndex = replicationGroupId.partitionId();
-
-        return new PartitionReplicaListener(
-                mvPartitionStorage,
-                new ExecutorInclinedRaftCommandRunner(raftClient, 
partitionOperationsExecutor),
-                txManager,
-                lockMgr,
-                scanRequestExecutor,
-                replicationGroupId,
-                table.tableId(),
-                table.indexesLockers(partitionIndex),
-                new Lazy<>(() -> 
table.indexStorageAdapters(partitionIndex).get().get(table.pkId())),
-                () -> table.indexStorageAdapters(partitionIndex).get(),
-                clockService,
-                safeTimeTracker,
-                transactionStateResolver,
-                partitionUpdateHandlers.storageUpdateHandler,
-                validationSchemasSource,
-                localNode(),
-                executorInclinedSchemaSyncService,
-                catalogService,
-                executorInclinedPlacementDriver,
-                topologyService,
-                remotelyTriggeredResourceRegistry,
-                schemaManager.schemaRegistry(table.tableId()),
-                indexMetaStorage,
-                lowWatermark,
-                failureProcessor,
-                table.metrics()
-        );
-    }
-
-    private PartitionDataStorage partitionDataStorage(PartitionKey 
partitionKey, int tableId, MvPartitionStorage partitionStorage) {
-        return new SnapshotAwarePartitionDataStorage(
-                tableId,
-                partitionStorage,
-                outgoingSnapshotsManager,
-                partitionKey
-        );
-    }
-
     @Override
     public void beforeNodeStop() {
         if (!beforeStopGuard.compareAndSet(false, true)) {
@@ -1569,6 +1495,38 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         // In case of colocation there shouldn't be any table replica and thus 
it shouldn't be stopped.
         minTimeCollectorService.removePartition(tablePartitionId);
 
+        unregisterPartitionMetrics(tablePartitionId, table.name());
+
+        return mvGc.removeStorage(tablePartitionId);
+    }
+
+    private void registerPartitionTableStatsMetrics(
+            TableViewInternal table,
+            int partitionId,
+            PartitionResources partitionResources
+    ) {
+        PartitionTableStatsMetricSource metricSource =
+                new PartitionTableStatsMetricSource(table.tableId(), 
partitionId, partitionResources.modificationCounter);
+
+        try {
+            // Only register this Metrics Source and do not enable it by 
default
+            // as it is intended for online troubleshooting purposes only (see 
IGNITE-27813).
+            metricManager.registerSource(metricSource);
+
+            TablePartitionId tablePartitionId = new 
TablePartitionId(table.tableId(), partitionId);
+
+            partModCounterMetricSources.put(tablePartitionId, metricSource);
+        } catch (Exception e) {
+            LOG.warn("Failed to register metrics source for table [name={}, 
partitionId={}].", e, table.name(), partitionId);
+        }
+
+        pendingWriteIntentsSuppliers.put(
+                new TablePartitionId(table.tableId(), partitionId),
+                partitionResources.storageUpdateHandler::getPendingRowCount
+        );
+    }
+
+    private void unregisterPartitionMetrics(TablePartitionId tablePartitionId, 
String tableName) {
         PartitionTableStatsMetricSource metricSource = 
partModCounterMetricSources.remove(tablePartitionId);
         pendingWriteIntentsSuppliers.remove(tablePartitionId);
         if (metricSource != null) {
@@ -1576,11 +1534,19 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 metricManager.unregisterSource(metricSource);
             } catch (Exception e) {
                 String message = "Failed to unregister metrics source for 
table [name={}, partitionId={}].";
-                LOG.warn(message, e, table.name(), 
tablePartitionId.partitionId());
+                LOG.warn(message, e, tableName, 
tablePartitionId.partitionId());
             }
         }
+    }
 
-        return mvGc.removeStorage(tablePartitionId);
+    private long totalPendingWriteIntents() {
+        long sum = 0;
+
+        for (LongSupplier supplier : pendingWriteIntentsSuppliers.values()) {
+            sum += supplier.getAsLong();
+        }
+
+        return sum;
     }
 
     private CompletableFuture<Void> destroyPartitionStorages(
@@ -1607,78 +1573,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return allOf(destroyFutures.toArray(new CompletableFuture[]{}));
     }
 
-    private InternalClusterNode localNode() {
-        return topologyService.localMember();
-    }
-
-    private PartitionUpdateHandlers createPartitionUpdateHandlers(
-            int partitionId,
-            PartitionDataStorage partitionDataStorage,
-            TableViewInternal table,
-            PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTimeTracker,
-            ReplicationConfiguration replicationConfiguration,
-            boolean onNodeRecovery
-    ) {
-        TableIndexStoragesSupplier indexes = 
table.indexStorageAdapters(partitionId);
-
-        IndexUpdateHandler indexUpdateHandler = new 
IndexUpdateHandler(indexes);
-
-        GcUpdateHandler gcUpdateHandler = new 
GcUpdateHandler(partitionDataStorage, safeTimeTracker, indexUpdateHandler);
-
-        SizeSupplier partSizeSupplier = () -> 
partitionDataStorage.getStorage().estimatedSize();
-
-        PartitionModificationCounter modificationCounter =
-                partitionModificationCounterFactory.create(partSizeSupplier, 
table::stalenessConfiguration, table.tableId(), partitionId);
-
-        StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
-                partitionId,
-                partitionDataStorage,
-                indexUpdateHandler,
-                replicationConfiguration,
-                modificationCounter,
-                txManager
-        );
-
-        storageUpdateHandler.start(onNodeRecovery);
-
-        registerPartitionTableStatsMetrics(table, partitionId, 
modificationCounter);
-
-        pendingWriteIntentsSuppliers.put(new TablePartitionId(table.tableId(), 
partitionId), storageUpdateHandler::getPendingRowCount);
-
-        return new PartitionUpdateHandlers(storageUpdateHandler, 
indexUpdateHandler, gcUpdateHandler);
-    }
-
-    private void registerPartitionTableStatsMetrics(
-            TableViewInternal table,
-            int partitionId,
-            PartitionModificationCounter counter
-    ) {
-        PartitionTableStatsMetricSource metricSource =
-                new PartitionTableStatsMetricSource(table.tableId(), 
partitionId, counter);
-
-        try {
-            // Only register this Metrics Source and do not enable it by 
default
-            // as it is intended for online troubleshooting purposes only.
-            metricManager.registerSource(metricSource);
-
-            TablePartitionId tablePartitionId = new 
TablePartitionId(table.tableId(), partitionId);
-
-            partModCounterMetricSources.put(tablePartitionId, metricSource);
-        } catch (Exception e) {
-            LOG.warn("Failed to register metrics source for table [name={}, 
partitionId={}].", e, table.name(), partitionId);
-        }
-    }
-
-    private long totalPendingWriteIntents() {
-        long sum = 0;
-
-        for (LongSupplier supplier : pendingWriteIntentsSuppliers.values()) {
-            sum += supplier.getAsLong();
-        }
-
-        return sum;
-    }
-
     /**
      * Returns a cached table instance if it exists, {@code null} otherwise. 
Can return a table that is being stopped.
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
new file mode 100644
index 00000000000..b5a35c50f17
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
+import org.apache.ignite.internal.network.TopologyService;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
+import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.table.TableViewInternal;
+import 
org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory.SizeSupplier;
+import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
+import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
+import 
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl;
+import 
org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
+import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
+import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
+import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+
+/**
+ * Stateless factory for creating partition-level resources: data storage 
wrappers, update handlers, and replica listeners.
+ *
+ * <p>This factory performs pure construction only — it does not start 
components, register metrics,
+ * or own any mutable state. Lifecycle management (start/stop, metric 
registration/deregistration)
+ * remains in {@link TableManager}.
+ *
+ * <p><b>Lifecycle ordering:</b> the caller must invoke {@link 
StorageUpdateHandler#start} on the
+ * {@link PartitionResources#storageUpdateHandler} returned by {@link 
#createPartitionResources} before
+ * the constructed objects ({@link TablePartitionProcessor}, {@link 
PartitionMvStorageAccess},
+ * {@link PartitionReplicaListener}) are used at runtime.
+ */
+class TablePartitionResourcesFactory {
+    private final TxManager txManager;
+    private final LockManager lockManager;
+    private final ExecutorService scanRequestExecutor;
+    private final ClockService clockService;
+    private final CatalogService catalogService;
+    private final PartitionModificationCounterFactory 
partitionModificationCounterFactory;
+    private final OutgoingSnapshotsManager outgoingSnapshotsManager;
+    private final LowWatermark lowWatermark;
+    private final ValidationSchemasSource validationSchemasSource;
+    private final SchemaSyncService schemaSyncService;
+    private final LeasePlacementDriver placementDriver;
+    private final TopologyService topologyService;
+    private final RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry;
+    private final FailureProcessor failureProcessor;
+    private final SchemaManager schemaManager;
+    private final ReplicationConfiguration replicationConfiguration;
+    private final Executor partitionOperationsExecutor;
+    private final IndexMetaStorage indexMetaStorage;
+    private final MinimumRequiredTimeCollectorService minTimeCollectorService;
+    private final MvGc mvGc;
+    private final FullStateTransferIndexChooser fullStateTransferIndexChooser;
+
+    TablePartitionResourcesFactory(
+            TxManager txManager,
+            LockManager lockManager,
+            ExecutorService scanRequestExecutor,
+            ClockService clockService,
+            CatalogService catalogService,
+            PartitionModificationCounterFactory 
partitionModificationCounterFactory,
+            OutgoingSnapshotsManager outgoingSnapshotsManager,
+            LowWatermark lowWatermark,
+            ValidationSchemasSource validationSchemasSource,
+            SchemaSyncService schemaSyncService,
+            LeasePlacementDriver placementDriver,
+            TopologyService topologyService,
+            RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry,
+            FailureProcessor failureProcessor,
+            SchemaManager schemaManager,
+            ReplicationConfiguration replicationConfiguration,
+            Executor partitionOperationsExecutor,
+            IndexMetaStorage indexMetaStorage,
+            MinimumRequiredTimeCollectorService minTimeCollectorService,
+            MvGc mvGc,
+            FullStateTransferIndexChooser fullStateTransferIndexChooser
+    ) {
+        this.txManager = txManager;
+        this.lockManager = lockManager;
+        this.scanRequestExecutor = scanRequestExecutor;
+        this.clockService = clockService;
+        this.catalogService = catalogService;
+        this.partitionModificationCounterFactory = 
partitionModificationCounterFactory;
+        this.outgoingSnapshotsManager = outgoingSnapshotsManager;
+        this.lowWatermark = lowWatermark;
+        this.validationSchemasSource = validationSchemasSource;
+        this.schemaSyncService = schemaSyncService;
+        this.placementDriver = placementDriver;
+        this.topologyService = topologyService;
+        this.remotelyTriggeredResourceRegistry = 
remotelyTriggeredResourceRegistry;
+        this.failureProcessor = failureProcessor;
+        this.schemaManager = schemaManager;
+        this.replicationConfiguration = replicationConfiguration;
+        this.partitionOperationsExecutor = partitionOperationsExecutor;
+        this.indexMetaStorage = indexMetaStorage;
+        this.minTimeCollectorService = minTimeCollectorService;
+        this.mvGc = mvGc;
+        this.fullStateTransferIndexChooser = fullStateTransferIndexChooser;
+    }
+
+    /**
+     * Creates a {@link PartitionDataStorage} for the given partition.
+     *
+     * @param partitionKey Partition key.
+     * @param tableId Table ID.
+     * @param partitionStorage MV partition storage.
+     * @return Partition data storage.
+     */
+    PartitionDataStorage createPartitionDataStorage(PartitionKey partitionKey, 
int tableId, MvPartitionStorage partitionStorage) {
+        return new SnapshotAwarePartitionDataStorage(
+                tableId,
+                partitionStorage,
+                outgoingSnapshotsManager,
+                partitionKey
+        );
+    }
+
+    /**
+     * Creates partition resources (index update handler, GC update handler, 
storage update handler, modification counter).
+     *
+     * <p>The returned resources are not started — the caller must invoke
+     * {@link StorageUpdateHandler#start} on {@link 
PartitionResources#storageUpdateHandler}
+     * before the constructed partition objects are used at runtime.
+     *
+     * @param partitionId Partition ID.
+     * @param partitionDataStorage Partition data storage.
+     * @param table Table view.
+     * @param safeTimeTracker Safe time tracker.
+     * @return Partition resources.
+     */
+    PartitionResources createPartitionResources(
+            int partitionId,
+            PartitionDataStorage partitionDataStorage,
+            TableViewInternal table,
+            PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTimeTracker
+    ) {
+        TableIndexStoragesSupplier indexes = 
table.indexStorageAdapters(partitionId);
+
+        IndexUpdateHandler indexUpdateHandler = new 
IndexUpdateHandler(indexes);
+
+        GcUpdateHandler gcUpdateHandler = new 
GcUpdateHandler(partitionDataStorage, safeTimeTracker, indexUpdateHandler);
+
+        SizeSupplier partSizeSupplier = () -> 
partitionDataStorage.getStorage().estimatedSize();
+
+        PartitionModificationCounter modificationCounter =
+                partitionModificationCounterFactory.create(partSizeSupplier, 
table::stalenessConfiguration, table.tableId(), partitionId);
+
+        StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
+                partitionId,
+                partitionDataStorage,
+                indexUpdateHandler,
+                replicationConfiguration,
+                modificationCounter,
+                txManager
+        );
+
+        return new PartitionResources(storageUpdateHandler, 
indexUpdateHandler, gcUpdateHandler, modificationCounter);
+    }
+
+    /**
+     * Creates a {@link TablePartitionProcessor} for the given partition.
+     *
+     * @param zonePartitionId Zone partition ID.
+     * @param table Table view.
+     * @param partitionDataStorage Partition data storage.
+     * @param partitionResources Partition resources.
+     * @return Table partition processor.
+     */
+    TablePartitionProcessor createTablePartitionProcessor(
+            ZonePartitionId zonePartitionId,
+            TableViewInternal table,
+            PartitionDataStorage partitionDataStorage,
+            PartitionResources partitionResources
+    ) {
+        return new TablePartitionProcessor(
+                txManager,
+                partitionDataStorage,
+                partitionResources.storageUpdateHandler,
+                catalogService,
+                table.schemaView(),
+                indexMetaStorage,
+                topologyService.localMember().id(),
+                minTimeCollectorService,
+                placementDriver,
+                clockService,
+                zonePartitionId
+        );
+    }
+
+    /**
+     * Creates a {@link PartitionMvStorageAccess} for the given partition.
+     *
+     * @param partitionId Partition ID.
+     * @param table Table view.
+     * @param partitionResources Partition resources.
+     * @return Partition MV storage access.
+     */
+    PartitionMvStorageAccess createPartitionMvStorageAccess(
+            int partitionId,
+            TableViewInternal table,
+            PartitionResources partitionResources
+    ) {
+        return new PartitionMvStorageAccessImpl(
+                partitionId,
+                table.internalTable().storage(),
+                mvGc,
+                partitionResources.indexUpdateHandler,
+                partitionResources.gcUpdateHandler,
+                fullStateTransferIndexChooser,
+                schemaManager.schemaRegistry(table.tableId()),
+                lowWatermark
+        );
+    }
+
+    /**
+     * Creates a {@link PartitionReplicaListener} for the given partition.
+     *
+     * @param replicationGroupId Zone partition ID used as the replication 
group ID.
+     * @param table Table view.
+     * @param safeTimeTracker Safe time tracker.
+     * @param mvPartitionStorage MV partition storage.
+     * @param partitionResources Partition resources.
+     * @param raftClient Raft command runner.
+     * @param transactionStateResolver Transaction state resolver.
+     * @return Partition replica listener.
+     */
+    PartitionReplicaListener createReplicaListener(
+            ZonePartitionId replicationGroupId,
+            TableViewInternal table,
+            PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTimeTracker,
+            MvPartitionStorage mvPartitionStorage,
+            PartitionResources partitionResources,
+            RaftCommandRunner raftClient,
+            TransactionStateResolver transactionStateResolver
+    ) {
+        int partitionIndex = replicationGroupId.partitionId();
+
+        return new PartitionReplicaListener(
+                mvPartitionStorage,
+                new ExecutorInclinedRaftCommandRunner(raftClient, 
partitionOperationsExecutor),
+                txManager,
+                lockManager,
+                scanRequestExecutor,
+                replicationGroupId,
+                table.tableId(),
+                table.indexesLockers(partitionIndex),
+                new Lazy<>(() -> 
table.indexStorageAdapters(partitionIndex).get().get(table.pkId())),
+                () -> table.indexStorageAdapters(partitionIndex).get(),
+                clockService,
+                safeTimeTracker,
+                transactionStateResolver,
+                partitionResources.storageUpdateHandler,
+                validationSchemasSource,
+                topologyService.localMember(),
+                schemaSyncService,
+                catalogService,
+                placementDriver,
+                topologyService,
+                remotelyTriggeredResourceRegistry,
+                schemaManager.schemaRegistry(table.tableId()),
+                indexMetaStorage,
+                lowWatermark,
+                failureProcessor,
+                table.metrics()
+        );
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index a1c60506c1b..89eb454fd4d 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -155,6 +155,7 @@ import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeColl
 import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.InjectExecutorService;
+import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
@@ -567,7 +568,7 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 replicationConfiguration,
                 clusterService.messagingService(),
                 clusterService.topologyService(),
-                null,
+                mock(LockManager.class),
                 null,
                 txManager,
                 dsm,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index afa7bd16407..e4f89aeaba3 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
@@ -266,6 +267,7 @@ public class TableManagerTest extends IgniteAbstractTest {
         assertThat("Catalog initialization", 
catalogManager.catalogInitializationFuture(), willCompleteSuccessfully());
 
         
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
+        
when(clusterService.topologyService()).thenReturn(mock(TopologyService.class));
 
         
when(tm.transactionMetricsSource()).thenReturn(mock(TransactionMetricsSource.class));
 
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
index f4acf416e40..b9cad1da0b0 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
@@ -71,7 +71,7 @@ public class TableTestUtils {
             new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 
0, () -> new TableStatsStalenessConfiguration(0, 0));
 
     /** No-op partition modification counter factory. */
-    public static PartitionModificationCounterFactory 
NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY =
+    public static final PartitionModificationCounterFactory 
NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY =
             new PartitionModificationCounterFactory(() -> 
HybridTimestamp.MIN_VALUE, mock(MessagingService.class)) {
                 @Override
                 public PartitionModificationCounter create(


Reply via email to