This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 67cc4e20bb7 IGNITE-28007 Split TablePartitionResourcesFactory out of
TableManager (#7701)
67cc4e20bb7 is described below
commit 67cc4e20bb7f11336bb63a2289d869e266237f68
Author: Viacheslav Blinov <[email protected]>
AuthorDate: Tue Mar 17 12:49:10 2026 +0300
IGNITE-28007 Split TablePartitionResourcesFactory out of TableManager
(#7701)
---
...UpdateHandlers.java => PartitionResources.java} | 12 +-
.../internal/table/distributed/TableManager.java | 290 ++++++-------------
.../TablePartitionResourcesFactory.java | 307 +++++++++++++++++++++
.../distributed/TableManagerRecoveryTest.java | 3 +-
.../table/distributed/TableManagerTest.java | 2 +
.../ignite/internal/table/TableTestUtils.java | 2 +-
6 files changed, 412 insertions(+), 204 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionUpdateHandlers.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionResources.java
similarity index 79%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionUpdateHandlers.java
rename to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionResources.java
index 3967b76724b..ef38773b7a3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionUpdateHandlers.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionResources.java
@@ -21,22 +21,26 @@ import
org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
/**
- * Partition update handler container.
+ * Partition resource container: handlers and modification counter created
during partition setup.
*/
-class PartitionUpdateHandlers {
+class PartitionResources {
final StorageUpdateHandler storageUpdateHandler;
final IndexUpdateHandler indexUpdateHandler;
final GcUpdateHandler gcUpdateHandler;
- PartitionUpdateHandlers(
+ final PartitionModificationCounter modificationCounter;
+
+ PartitionResources(
StorageUpdateHandler storageUpdateHandler,
IndexUpdateHandler indexUpdateHandler,
- GcUpdateHandler gcUpdateHandler
+ GcUpdateHandler gcUpdateHandler,
+ PartitionModificationCounter modificationCounter
) {
this.storageUpdateHandler = storageUpdateHandler;
this.indexUpdateHandler = indexUpdateHandler;
this.gcUpdateHandler = gcUpdateHandler;
+ this.modificationCounter = modificationCounter;
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 38adc3c3856..efa96328132 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -109,7 +109,6 @@ import
org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metrics.MetricManager;
-import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.partition.replicator.LocalBeforeReplicaStartEventParameters;
@@ -120,13 +119,12 @@ import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycle
import
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import
org.apache.ignite.internal.partition.replicator.schema.ExecutorInclinedSchemaSyncService;
import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import
org.apache.ignite.internal.placementdriver.wrappers.ExecutorInclinedPlacementDriver;
-import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
-import org.apache.ignite.internal.raft.service.RaftCommandRunner;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -148,17 +146,11 @@ import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TableViewInternal;
-import
org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory.SizeSupplier;
-import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.gc.MvGc;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
-import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
-import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl;
-import
org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
-import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
@@ -170,14 +162,11 @@ import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
-import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.LongPriorityQueue;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.table.QualifiedName;
@@ -266,8 +255,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final ClockService clockService;
- private final OutgoingSnapshotsManager outgoingSnapshotsManager;
-
private final SchemaSyncService executorInclinedSchemaSyncService;
private final CatalogService catalogService;
@@ -288,28 +275,17 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
private final SchemaVersions schemaVersions;
- private final ValidationSchemasSource validationSchemasSource;
-
private final PartitionReplicatorNodeRecovery
partitionReplicatorNodeRecovery;
/** Ends at the {@link IgniteComponent#stopAsync(ComponentContext)} with
an {@link NodeStoppingException}. */
private final CompletableFuture<Void> stopManagerFuture = new
CompletableFuture<>();
- private final ReplicationConfiguration replicationConfiguration;
-
- /**
- * Executes partition operations (that might cause I/O and/or be blocked
on locks).
- */
- private final Executor partitionOperationsExecutor;
-
/** Marshallers provider. */
private final ReflectionMarshallersProvider marshallers = new
ReflectionMarshallersProvider();
/** Index chooser for full state transfer. */
private final FullStateTransferIndexChooser fullStateTransferIndexChooser;
- private final RemotelyTriggeredResourceRegistry
remotelyTriggeredResourceRegistry;
-
private final TransactionInflights transactionInflights;
private final String nodeName;
@@ -319,8 +295,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
@Nullable
private ScheduledExecutorService streamerFlushExecutor;
- private final IndexMetaStorage indexMetaStorage;
-
private final MinimumRequiredTimeCollectorService minTimeCollectorService;
@Nullable
@@ -349,10 +323,11 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
private final MetricManager metricManager;
- private final PartitionModificationCounterFactory
partitionModificationCounterFactory;
private final Map<TablePartitionId, PartitionTableStatsMetricSource>
partModCounterMetricSources = new ConcurrentHashMap<>();
private final Map<TablePartitionId, LongSupplier>
pendingWriteIntentsSuppliers = new ConcurrentHashMap<>();
+ private final TablePartitionResourcesFactory partitionResourcesFactory;
+
/**
* Creates a new table manager.
*
@@ -421,25 +396,18 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
this.dataStorageMgr = dataStorageMgr;
this.metaStorageMgr = metaStorageMgr;
this.schemaManager = schemaManager;
- this.validationSchemasSource = validationSchemasSource;
this.ioExecutor = ioExecutor;
- this.partitionOperationsExecutor = partitionOperationsExecutor;
this.clockService = clockService;
- this.outgoingSnapshotsManager = outgoingSnapshotsManager;
this.catalogService = catalogService;
this.failureProcessor = failureProcessor;
this.observableTimestampTracker = observableTimestampTracker;
this.sql = sql;
- this.replicationConfiguration = replicationConfiguration;
- this.remotelyTriggeredResourceRegistry =
remotelyTriggeredResourceRegistry;
this.lowWatermark = lowWatermark;
this.transactionInflights = transactionInflights;
this.nodeName = nodeName;
- this.indexMetaStorage = indexMetaStorage;
this.partitionReplicaLifecycleManager =
partitionReplicaLifecycleManager;
this.minTimeCollectorService = minTimeCollectorService;
this.metricManager = metricManager;
- this.partitionModificationCounterFactory =
partitionModificationCounterFactory;
this.executorInclinedSchemaSyncService = new
ExecutorInclinedSchemaSyncService(schemaSyncService,
partitionOperationsExecutor);
this.executorInclinedPlacementDriver = new
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -465,6 +433,30 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
fullStateTransferIndexChooser = new
FullStateTransferIndexChooser(catalogService, lowWatermark, indexMetaStorage);
+ partitionResourcesFactory = new TablePartitionResourcesFactory(
+ txManager,
+ lockMgr,
+ scanRequestExecutor,
+ clockService,
+ catalogService,
+ partitionModificationCounterFactory,
+ outgoingSnapshotsManager,
+ lowWatermark,
+ validationSchemasSource,
+ this.executorInclinedSchemaSyncService,
+ this.executorInclinedPlacementDriver,
+ topologyService,
+ remotelyTriggeredResourceRegistry,
+ failureProcessor,
+ schemaManager,
+ replicationConfiguration,
+ partitionOperationsExecutor,
+ indexMetaStorage,
+ minTimeCollectorService,
+ mvGc,
+ fullStateTransferIndexChooser
+ );
+
rebalanceRetryDelayConfiguration = new
SystemDistributedConfigurationPropertyHolder<>(
systemDistributedConfiguration,
(v, r) -> {},
@@ -860,65 +852,49 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return;
}
- PartitionDataStorage partitionDataStorage = partitionDataStorage(
+ PartitionDataStorage partitionDataStorage =
partitionResourcesFactory.createPartitionDataStorage(
new PartitionKey(zonePartitionId.zoneId(), partId),
tableId,
mvPartitionStorage
);
- PartitionUpdateHandlers partitionUpdateHandlers =
createPartitionUpdateHandlers(
+ PartitionResources partitionResources =
partitionResourcesFactory.createPartitionResources(
partId,
partitionDataStorage,
table,
- resources.safeTimeTracker(),
- replicationConfiguration,
- onNodeRecovery
+ resources.safeTimeTracker()
);
- mvGc.addStorage(tablePartitionId,
partitionUpdateHandlers.gcUpdateHandler);
+ partitionResources.storageUpdateHandler.start(onNodeRecovery);
+
+ registerPartitionTableStatsMetrics(table, partId, partitionResources);
+
+ mvGc.addStorage(tablePartitionId, partitionResources.gcUpdateHandler);
minTimeCollectorService.addPartition(new TablePartitionId(tableId,
partId));
- TablePartitionReplicaProcessorFactory createListener = (raftClient,
transactionStateResolver) -> createReplicaListener(
- zonePartitionId,
- table,
- resources.safeTimeTracker(),
- mvPartitionStorage,
- partitionUpdateHandlers,
- raftClient,
- transactionStateResolver
- );
+ TablePartitionReplicaProcessorFactory createListener = (raftClient,
transactionStateResolver) ->
+ partitionResourcesFactory.createReplicaListener(
+ zonePartitionId,
+ table,
+ resources.safeTimeTracker(),
+ mvPartitionStorage,
+ partitionResources,
+ raftClient,
+ transactionStateResolver
+ );
- var tablePartitionRaftListener = new TablePartitionProcessor(
- txManager,
- partitionDataStorage,
- partitionUpdateHandlers.storageUpdateHandler,
- catalogService,
- table.schemaView(),
- indexMetaStorage,
- topologyService.localMember().id(),
- minTimeCollectorService,
- executorInclinedPlacementDriver,
- clockService,
- zonePartitionId
- );
+ TablePartitionProcessor tablePartitionProcessor =
partitionResourcesFactory.createTablePartitionProcessor(
+ zonePartitionId, table, partitionDataStorage,
partitionResources);
- var partitionStorageAccess = new PartitionMvStorageAccessImpl(
- partId,
- table.internalTable().storage(),
- mvGc,
- partitionUpdateHandlers.indexUpdateHandler,
- partitionUpdateHandlers.gcUpdateHandler,
- fullStateTransferIndexChooser,
- schemaManager.schemaRegistry(tableId),
- lowWatermark
- );
+ PartitionMvStorageAccess partitionStorageAccess =
partitionResourcesFactory.createPartitionMvStorageAccess(
+ partId, table, partitionResources);
partitionReplicaLifecycleManager.loadTableListenerToZoneReplica(
zonePartitionId,
tableId,
createListener,
- tablePartitionRaftListener,
+ tablePartitionProcessor,
partitionStorageAccess,
onNodeRecovery
);
@@ -998,56 +974,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
);
}
- private PartitionReplicaListener createReplicaListener(
- ZonePartitionId replicationGroupId,
- TableViewInternal table,
- PendingComparableValuesTracker<HybridTimestamp, Void>
safeTimeTracker,
- MvPartitionStorage mvPartitionStorage,
- PartitionUpdateHandlers partitionUpdateHandlers,
- RaftCommandRunner raftClient,
- TransactionStateResolver transactionStateResolver
- ) {
- int partitionIndex = replicationGroupId.partitionId();
-
- return new PartitionReplicaListener(
- mvPartitionStorage,
- new ExecutorInclinedRaftCommandRunner(raftClient,
partitionOperationsExecutor),
- txManager,
- lockMgr,
- scanRequestExecutor,
- replicationGroupId,
- table.tableId(),
- table.indexesLockers(partitionIndex),
- new Lazy<>(() ->
table.indexStorageAdapters(partitionIndex).get().get(table.pkId())),
- () -> table.indexStorageAdapters(partitionIndex).get(),
- clockService,
- safeTimeTracker,
- transactionStateResolver,
- partitionUpdateHandlers.storageUpdateHandler,
- validationSchemasSource,
- localNode(),
- executorInclinedSchemaSyncService,
- catalogService,
- executorInclinedPlacementDriver,
- topologyService,
- remotelyTriggeredResourceRegistry,
- schemaManager.schemaRegistry(table.tableId()),
- indexMetaStorage,
- lowWatermark,
- failureProcessor,
- table.metrics()
- );
- }
-
- private PartitionDataStorage partitionDataStorage(PartitionKey
partitionKey, int tableId, MvPartitionStorage partitionStorage) {
- return new SnapshotAwarePartitionDataStorage(
- tableId,
- partitionStorage,
- outgoingSnapshotsManager,
- partitionKey
- );
- }
-
@Override
public void beforeNodeStop() {
if (!beforeStopGuard.compareAndSet(false, true)) {
@@ -1569,6 +1495,38 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
// In case of colocation there shouldn't be any table replica and thus
it shouldn't be stopped.
minTimeCollectorService.removePartition(tablePartitionId);
+ unregisterPartitionMetrics(tablePartitionId, table.name());
+
+ return mvGc.removeStorage(tablePartitionId);
+ }
+
+ private void registerPartitionTableStatsMetrics(
+ TableViewInternal table,
+ int partitionId,
+ PartitionResources partitionResources
+ ) {
+ PartitionTableStatsMetricSource metricSource =
+ new PartitionTableStatsMetricSource(table.tableId(),
partitionId, partitionResources.modificationCounter);
+
+ try {
+ // Only register this Metrics Source and do not enable it by
default
+ // as it is intended for online troubleshooting purposes only (see
IGNITE-27813).
+ metricManager.registerSource(metricSource);
+
+ TablePartitionId tablePartitionId = new
TablePartitionId(table.tableId(), partitionId);
+
+ partModCounterMetricSources.put(tablePartitionId, metricSource);
+ } catch (Exception e) {
+ LOG.warn("Failed to register metrics source for table [name={},
partitionId={}].", e, table.name(), partitionId);
+ }
+
+ pendingWriteIntentsSuppliers.put(
+ new TablePartitionId(table.tableId(), partitionId),
+ partitionResources.storageUpdateHandler::getPendingRowCount
+ );
+ }
+
+ private void unregisterPartitionMetrics(TablePartitionId tablePartitionId,
String tableName) {
PartitionTableStatsMetricSource metricSource =
partModCounterMetricSources.remove(tablePartitionId);
pendingWriteIntentsSuppliers.remove(tablePartitionId);
if (metricSource != null) {
@@ -1576,11 +1534,19 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
metricManager.unregisterSource(metricSource);
} catch (Exception e) {
String message = "Failed to unregister metrics source for
table [name={}, partitionId={}].";
- LOG.warn(message, e, table.name(),
tablePartitionId.partitionId());
+ LOG.warn(message, e, tableName,
tablePartitionId.partitionId());
}
}
+ }
- return mvGc.removeStorage(tablePartitionId);
+ private long totalPendingWriteIntents() {
+ long sum = 0;
+
+ for (LongSupplier supplier : pendingWriteIntentsSuppliers.values()) {
+ sum += supplier.getAsLong();
+ }
+
+ return sum;
}
private CompletableFuture<Void> destroyPartitionStorages(
@@ -1607,78 +1573,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return allOf(destroyFutures.toArray(new CompletableFuture[]{}));
}
- private InternalClusterNode localNode() {
- return topologyService.localMember();
- }
-
- private PartitionUpdateHandlers createPartitionUpdateHandlers(
- int partitionId,
- PartitionDataStorage partitionDataStorage,
- TableViewInternal table,
- PendingComparableValuesTracker<HybridTimestamp, Void>
safeTimeTracker,
- ReplicationConfiguration replicationConfiguration,
- boolean onNodeRecovery
- ) {
- TableIndexStoragesSupplier indexes =
table.indexStorageAdapters(partitionId);
-
- IndexUpdateHandler indexUpdateHandler = new
IndexUpdateHandler(indexes);
-
- GcUpdateHandler gcUpdateHandler = new
GcUpdateHandler(partitionDataStorage, safeTimeTracker, indexUpdateHandler);
-
- SizeSupplier partSizeSupplier = () ->
partitionDataStorage.getStorage().estimatedSize();
-
- PartitionModificationCounter modificationCounter =
- partitionModificationCounterFactory.create(partSizeSupplier,
table::stalenessConfiguration, table.tableId(), partitionId);
-
- StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
- partitionId,
- partitionDataStorage,
- indexUpdateHandler,
- replicationConfiguration,
- modificationCounter,
- txManager
- );
-
- storageUpdateHandler.start(onNodeRecovery);
-
- registerPartitionTableStatsMetrics(table, partitionId,
modificationCounter);
-
- pendingWriteIntentsSuppliers.put(new TablePartitionId(table.tableId(),
partitionId), storageUpdateHandler::getPendingRowCount);
-
- return new PartitionUpdateHandlers(storageUpdateHandler,
indexUpdateHandler, gcUpdateHandler);
- }
-
- private void registerPartitionTableStatsMetrics(
- TableViewInternal table,
- int partitionId,
- PartitionModificationCounter counter
- ) {
- PartitionTableStatsMetricSource metricSource =
- new PartitionTableStatsMetricSource(table.tableId(),
partitionId, counter);
-
- try {
- // Only register this Metrics Source and do not enable it by
default
- // as it is intended for online troubleshooting purposes only.
- metricManager.registerSource(metricSource);
-
- TablePartitionId tablePartitionId = new
TablePartitionId(table.tableId(), partitionId);
-
- partModCounterMetricSources.put(tablePartitionId, metricSource);
- } catch (Exception e) {
- LOG.warn("Failed to register metrics source for table [name={},
partitionId={}].", e, table.name(), partitionId);
- }
- }
-
- private long totalPendingWriteIntents() {
- long sum = 0;
-
- for (LongSupplier supplier : pendingWriteIntentsSuppliers.values()) {
- sum += supplier.getAsLong();
- }
-
- return sum;
- }
-
/**
* Returns a cached table instance if it exists, {@code null} otherwise.
Can return a table that is being stopped.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
new file mode 100644
index 00000000000..b5a35c50f17
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
+import org.apache.ignite.internal.network.TopologyService;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
+import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.table.TableViewInternal;
+import
org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory.SizeSupplier;
+import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
+import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
+import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
+import
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
+import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl;
+import
org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
+import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
+import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
+import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+
+/**
+ * Stateless factory for creating partition-level resources: data storage
wrappers, update handlers, and replica listeners.
+ *
+ * <p>This factory performs pure construction only — it does not start
components, register metrics,
+ * or own any mutable state. Lifecycle management (start/stop, metric
registration/deregistration)
+ * remains in {@link TableManager}.
+ *
+ * <p><b>Lifecycle ordering:</b> the caller must invoke {@link
StorageUpdateHandler#start} on the
+ * {@link PartitionResources#storageUpdateHandler} returned by {@link
#createPartitionResources} before
+ * the constructed objects ({@link TablePartitionProcessor}, {@link
PartitionMvStorageAccess},
+ * {@link PartitionReplicaListener}) are used at runtime.
+ */
+class TablePartitionResourcesFactory {
+ private final TxManager txManager;
+ private final LockManager lockManager;
+ private final ExecutorService scanRequestExecutor;
+ private final ClockService clockService;
+ private final CatalogService catalogService;
+ private final PartitionModificationCounterFactory
partitionModificationCounterFactory;
+ private final OutgoingSnapshotsManager outgoingSnapshotsManager;
+ private final LowWatermark lowWatermark;
+ private final ValidationSchemasSource validationSchemasSource;
+ private final SchemaSyncService schemaSyncService;
+ private final LeasePlacementDriver placementDriver;
+ private final TopologyService topologyService;
+ private final RemotelyTriggeredResourceRegistry
remotelyTriggeredResourceRegistry;
+ private final FailureProcessor failureProcessor;
+ private final SchemaManager schemaManager;
+ private final ReplicationConfiguration replicationConfiguration;
+ private final Executor partitionOperationsExecutor;
+ private final IndexMetaStorage indexMetaStorage;
+ private final MinimumRequiredTimeCollectorService minTimeCollectorService;
+ private final MvGc mvGc;
+ private final FullStateTransferIndexChooser fullStateTransferIndexChooser;
+
+ TablePartitionResourcesFactory(
+ TxManager txManager,
+ LockManager lockManager,
+ ExecutorService scanRequestExecutor,
+ ClockService clockService,
+ CatalogService catalogService,
+ PartitionModificationCounterFactory
partitionModificationCounterFactory,
+ OutgoingSnapshotsManager outgoingSnapshotsManager,
+ LowWatermark lowWatermark,
+ ValidationSchemasSource validationSchemasSource,
+ SchemaSyncService schemaSyncService,
+ LeasePlacementDriver placementDriver,
+ TopologyService topologyService,
+ RemotelyTriggeredResourceRegistry
remotelyTriggeredResourceRegistry,
+ FailureProcessor failureProcessor,
+ SchemaManager schemaManager,
+ ReplicationConfiguration replicationConfiguration,
+ Executor partitionOperationsExecutor,
+ IndexMetaStorage indexMetaStorage,
+ MinimumRequiredTimeCollectorService minTimeCollectorService,
+ MvGc mvGc,
+ FullStateTransferIndexChooser fullStateTransferIndexChooser
+ ) {
+ this.txManager = txManager;
+ this.lockManager = lockManager;
+ this.scanRequestExecutor = scanRequestExecutor;
+ this.clockService = clockService;
+ this.catalogService = catalogService;
+ this.partitionModificationCounterFactory =
partitionModificationCounterFactory;
+ this.outgoingSnapshotsManager = outgoingSnapshotsManager;
+ this.lowWatermark = lowWatermark;
+ this.validationSchemasSource = validationSchemasSource;
+ this.schemaSyncService = schemaSyncService;
+ this.placementDriver = placementDriver;
+ this.topologyService = topologyService;
+ this.remotelyTriggeredResourceRegistry =
remotelyTriggeredResourceRegistry;
+ this.failureProcessor = failureProcessor;
+ this.schemaManager = schemaManager;
+ this.replicationConfiguration = replicationConfiguration;
+ this.partitionOperationsExecutor = partitionOperationsExecutor;
+ this.indexMetaStorage = indexMetaStorage;
+ this.minTimeCollectorService = minTimeCollectorService;
+ this.mvGc = mvGc;
+ this.fullStateTransferIndexChooser = fullStateTransferIndexChooser;
+ }
+
+ /**
+ * Creates a {@link PartitionDataStorage} for the given partition.
+ *
+ * @param partitionKey Partition key.
+ * @param tableId Table ID.
+ * @param partitionStorage MV partition storage.
+ * @return Partition data storage.
+ */
+ PartitionDataStorage createPartitionDataStorage(PartitionKey partitionKey,
int tableId, MvPartitionStorage partitionStorage) {
+ return new SnapshotAwarePartitionDataStorage(
+ tableId,
+ partitionStorage,
+ outgoingSnapshotsManager,
+ partitionKey
+ );
+ }
+
+ /**
+ * Creates partition resources (index update handler, GC update handler,
storage update handler, modification counter).
+ *
+ * <p>The returned resources are not started — the caller must invoke
+ * {@link StorageUpdateHandler#start} on {@link
PartitionResources#storageUpdateHandler}
+ * before the constructed partition objects are used at runtime.
+ *
+ * @param partitionId Partition ID.
+ * @param partitionDataStorage Partition data storage.
+ * @param table Table view.
+ * @param safeTimeTracker Safe time tracker.
+ * @return Partition resources.
+ */
+ PartitionResources createPartitionResources(
+ int partitionId,
+ PartitionDataStorage partitionDataStorage,
+ TableViewInternal table,
+ PendingComparableValuesTracker<HybridTimestamp, Void>
safeTimeTracker
+ ) {
+ TableIndexStoragesSupplier indexes =
table.indexStorageAdapters(partitionId);
+
+ IndexUpdateHandler indexUpdateHandler = new
IndexUpdateHandler(indexes);
+
+ GcUpdateHandler gcUpdateHandler = new
GcUpdateHandler(partitionDataStorage, safeTimeTracker, indexUpdateHandler);
+
+ SizeSupplier partSizeSupplier = () ->
partitionDataStorage.getStorage().estimatedSize();
+
+ PartitionModificationCounter modificationCounter =
+ partitionModificationCounterFactory.create(partSizeSupplier,
table::stalenessConfiguration, table.tableId(), partitionId);
+
+ StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
+ partitionId,
+ partitionDataStorage,
+ indexUpdateHandler,
+ replicationConfiguration,
+ modificationCounter,
+ txManager
+ );
+
+ return new PartitionResources(storageUpdateHandler,
indexUpdateHandler, gcUpdateHandler, modificationCounter);
+ }
+
+ /**
+ * Creates a {@link TablePartitionProcessor} for the given partition.
+ *
+ * @param zonePartitionId Zone partition ID.
+ * @param table Table view.
+ * @param partitionDataStorage Partition data storage.
+ * @param partitionResources Partition resources.
+ * @return Table partition processor.
+ */
+ TablePartitionProcessor createTablePartitionProcessor(
+ ZonePartitionId zonePartitionId,
+ TableViewInternal table,
+ PartitionDataStorage partitionDataStorage,
+ PartitionResources partitionResources
+ ) {
+ return new TablePartitionProcessor(
+ txManager,
+ partitionDataStorage,
+ partitionResources.storageUpdateHandler,
+ catalogService,
+ table.schemaView(),
+ indexMetaStorage,
+ topologyService.localMember().id(),
+ minTimeCollectorService,
+ placementDriver,
+ clockService,
+ zonePartitionId
+ );
+ }
+
+ /**
+ * Creates a {@link PartitionMvStorageAccess} for the given partition.
+ *
+ * @param partitionId Partition ID.
+ * @param table Table view.
+ * @param partitionResources Partition resources.
+ * @return Partition MV storage access.
+ */
+ PartitionMvStorageAccess createPartitionMvStorageAccess(
+ int partitionId,
+ TableViewInternal table,
+ PartitionResources partitionResources
+ ) {
+ return new PartitionMvStorageAccessImpl(
+ partitionId,
+ table.internalTable().storage(),
+ mvGc,
+ partitionResources.indexUpdateHandler,
+ partitionResources.gcUpdateHandler,
+ fullStateTransferIndexChooser,
+ schemaManager.schemaRegistry(table.tableId()),
+ lowWatermark
+ );
+ }
+
+ /**
+ * Creates a {@link PartitionReplicaListener} for the given partition.
+ *
+ * @param replicationGroupId Zone partition ID used as the replication
group ID.
+ * @param table Table view.
+ * @param safeTimeTracker Safe time tracker.
+ * @param mvPartitionStorage MV partition storage.
+ * @param partitionResources Partition resources.
+ * @param raftClient Raft command runner.
+ * @param transactionStateResolver Transaction state resolver.
+ * @return Partition replica listener.
+ */
+ PartitionReplicaListener createReplicaListener(
+ ZonePartitionId replicationGroupId,
+ TableViewInternal table,
+ PendingComparableValuesTracker<HybridTimestamp, Void>
safeTimeTracker,
+ MvPartitionStorage mvPartitionStorage,
+ PartitionResources partitionResources,
+ RaftCommandRunner raftClient,
+ TransactionStateResolver transactionStateResolver
+ ) {
+ int partitionIndex = replicationGroupId.partitionId();
+
+ return new PartitionReplicaListener(
+ mvPartitionStorage,
+ new ExecutorInclinedRaftCommandRunner(raftClient,
partitionOperationsExecutor),
+ txManager,
+ lockManager,
+ scanRequestExecutor,
+ replicationGroupId,
+ table.tableId(),
+ table.indexesLockers(partitionIndex),
+ new Lazy<>(() ->
table.indexStorageAdapters(partitionIndex).get().get(table.pkId())),
+ () -> table.indexStorageAdapters(partitionIndex).get(),
+ clockService,
+ safeTimeTracker,
+ transactionStateResolver,
+ partitionResources.storageUpdateHandler,
+ validationSchemasSource,
+ topologyService.localMember(),
+ schemaSyncService,
+ catalogService,
+ placementDriver,
+ topologyService,
+ remotelyTriggeredResourceRegistry,
+ schemaManager.schemaRegistry(table.tableId()),
+ indexMetaStorage,
+ lowWatermark,
+ failureProcessor,
+ table.metrics()
+ );
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index a1c60506c1b..89eb454fd4d 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -155,6 +155,7 @@ import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeColl
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.InjectExecutorService;
+import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
@@ -567,7 +568,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
replicationConfiguration,
clusterService.messagingService(),
clusterService.topologyService(),
- null,
+ mock(LockManager.class),
null,
txManager,
dsm,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index afa7bd16407..e4f89aeaba3 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
@@ -266,6 +267,7 @@ public class TableManagerTest extends IgniteAbstractTest {
assertThat("Catalog initialization",
catalogManager.catalogInitializationFuture(), willCompleteSuccessfully());
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
+
when(clusterService.topologyService()).thenReturn(mock(TopologyService.class));
when(tm.transactionMetricsSource()).thenReturn(mock(TransactionMetricsSource.class));
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
index f4acf416e40..b9cad1da0b0 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
@@ -71,7 +71,7 @@ public class TableTestUtils {
new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () ->
0, () -> new TableStatsStalenessConfiguration(0, 0));
/** No-op partition modification counter factory. */
- public static PartitionModificationCounterFactory
NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY =
+ public static final PartitionModificationCounterFactory
NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY =
new PartitionModificationCounterFactory(() ->
HybridTimestamp.MIN_VALUE, mock(MessagingService.class)) {
@Override
public PartitionModificationCounter create(