This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ebedbb70704 IGNITE-28096 Extract TableImplFactory from TableManager 
(#7898)
ebedbb70704 is described below

commit ebedbb707047eef521bee14d6cb0994bd7d36595
Author: Viacheslav Blinov <[email protected]>
AuthorDate: Mon Apr 6 15:22:28 2026 +0300

    IGNITE-28096 Extract TableImplFactory from TableManager (#7898)
---
 .../rebalance/ItRebalanceDistributedTest.java      |   4 +-
 .../partition/replicator/fixtures/Node.java        |  53 ++---
 .../runner/app/ItIgniteNodeRestartTest.java        |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  14 +-
 .../distributed/DefaultMvTableStorageFactory.java  |  71 ++++++
 .../table/distributed/MvTableStorageFactory.java   |  36 +++
 .../distributed/StreamerFlushExecutorFactory.java  | 105 +++++++++
 .../table/distributed/TableImplFactory.java        | 257 +++++++++++++++++++++
 .../internal/table/distributed/TableManager.java   | 213 +++--------------
 .../distributed/TableManagerRecoveryTest.java      |  29 ++-
 .../table/distributed/TableManagerTest.java        |  28 ++-
 .../distributed/ItTxWriteIntentResolutionTest.java |   2 +-
 12 files changed, 585 insertions(+), 231 deletions(-)

diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index abbcb4e377a..02684f96d6d 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -232,6 +232,7 @@ import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
 import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.TableViewInternal;
+import 
org.apache.ignite.internal.table.distributed.DefaultMvTableStorageFactory;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
@@ -1595,7 +1596,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     minTimeCollectorService,
                     systemDistributedConfiguration,
                     metricManager,
-                    TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
+                    TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY,
+                    new DefaultMvTableStorageFactory(dataStorageMgr, 
catalogManager, lowWatermark)
             );
 
             
tableManager.setStreamerReceiverRunner(mock(StreamerReceiverRunner.class));
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 529af9b7eb5..d61f25985ab 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -57,8 +57,6 @@ import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogManagerImpl;
 import org.apache.ignite.internal.catalog.CatalogTestUtils;
 import org.apache.ignite.internal.catalog.compaction.CatalogCompactionRunner;
-import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
 import org.apache.ignite.internal.cluster.management.ClusterIdHolder;
 import org.apache.ignite.internal.cluster.management.ClusterInitializer;
@@ -169,6 +167,8 @@ import 
org.apache.ignite.internal.systemview.SystemViewManagerImpl;
 import org.apache.ignite.internal.systemview.api.SystemViewManager;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
 import org.apache.ignite.internal.table.TableTestUtils;
+import 
org.apache.ignite.internal.table.distributed.DefaultMvTableStorageFactory;
+import org.apache.ignite.internal.table.distributed.MvTableStorageFactory;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
@@ -800,30 +800,9 @@ public class Node {
                 minTimeCollectorService,
                 systemDistributedConfiguration,
                 metricManager,
-                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
-        ) {
-
-            @Override
-            protected MvTableStorage createTableStorage(CatalogTableDescriptor 
tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
-                MvTableStorage storage = 
createSpy(super.createTableStorage(tableDescriptor, zoneDescriptor));
-
-                var partitionStorages = new ConcurrentHashMap<Integer, 
MvPartitionStorage>();
-
-                doAnswer(invocation -> {
-                    Integer partitionId = invocation.getArgument(0);
-
-                    return partitionStorages.computeIfAbsent(partitionId, id 
-> {
-                        try {
-                            return (MvPartitionStorage) 
createSpy(invocation.callRealMethod());
-                        } catch (Throwable e) {
-                            throw new RuntimeException(e);
-                        }
-                    });
-                }).when(storage).getMvPartition(anyInt());
-
-                return storage;
-            }
-        };
+                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY,
+                spyingMvTableStorageFactory(new 
DefaultMvTableStorageFactory(dataStorageMgr, catalogManager, lowWatermark))
+        );
 
         
tableManager.setStreamerReceiverRunner(mock(StreamerReceiverRunner.class));
 
@@ -1050,6 +1029,28 @@ public class Node {
         return primaryReplicaFuture.join();
     }
 
+    private static MvTableStorageFactory 
spyingMvTableStorageFactory(MvTableStorageFactory delegate) {
+        return (tableDesc, zoneDesc) -> {
+            MvTableStorage storage = 
createSpy(delegate.createMvTableStorage(tableDesc, zoneDesc));
+
+            var partitionStorages = new ConcurrentHashMap<Integer, 
MvPartitionStorage>();
+
+            doAnswer(invocation -> {
+                Integer partitionId = invocation.getArgument(0);
+
+                return partitionStorages.computeIfAbsent(partitionId, id -> {
+                    try {
+                        return (MvPartitionStorage) 
createSpy(invocation.callRealMethod());
+                    } catch (Throwable e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            }).when(storage).getMvPartition(anyInt());
+
+            return storage;
+        };
+    }
+
     @Contract("null -> null")
     private static <T> @Nullable T createSpy(@Nullable T object) {
         if (object == null) {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 77684332f2c..c470b932a80 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -212,6 +212,7 @@ import 
org.apache.ignite.internal.table.RecordBinaryViewImpl;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.TableViewInternal;
+import 
org.apache.ignite.internal.table.distributed.DefaultMvTableStorageFactory;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
@@ -818,7 +819,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 minTimeCollectorService,
                 systemDistributedConfiguration,
                 metricManager,
-                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
+                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY,
+                new DefaultMvTableStorageFactory(dataStorageManager, 
catalogManager, lowWatermark)
         );
 
         var indexManager = new IndexManager(
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 97f573b5986..41844c92b34 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -268,6 +268,7 @@ import 
org.apache.ignite.internal.system.CpuInformationProvider;
 import org.apache.ignite.internal.system.JvmCpuInformationProvider;
 import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
 import org.apache.ignite.internal.systemview.api.SystemViewManager;
+import 
org.apache.ignite.internal.table.distributed.DefaultMvTableStorageFactory;
 import 
org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory;
 import 
org.apache.ignite.internal.table.distributed.PublicApiThreadingIgniteTables;
 import org.apache.ignite.internal.table.distributed.TableManager;
@@ -393,6 +394,9 @@ public class IgniteImpl implements Ignite {
     /** Replica manager. */
     private final ReplicaManager replicaMgr;
 
+    /** Replica service. */
+    private final ReplicaService replicaSvc;
+
     /** Transactions manager. */
     private final TxManagerImpl txManager;
 
@@ -918,7 +922,7 @@ public class IgniteImpl implements Ignite {
 
         TransactionConfiguration txConfig = 
clusterConfigRegistry.getConfiguration(TransactionExtensionConfiguration.KEY).transaction();
 
-        ReplicaService replicaSvc = new ReplicaService(
+        replicaSvc = new ReplicaService(
                 messagingServiceReturningToStorageOperationsPool,
                 clockService,
                 threadPoolsManager.partitionOperationsExecutor(),
@@ -1195,7 +1199,8 @@ public class IgniteImpl implements Ignite {
                 minTimeCollectorService,
                 systemDistributedConfiguration,
                 metricManager,
-                partitionModificationCounterFactory
+                partitionModificationCounterFactory,
+                new DefaultMvTableStorageFactory(dataStorageMgr, 
catalogManager, lowWatermark)
         );
 
         disasterRecoveryManager = new DisasterRecoveryManager(
@@ -1837,6 +1842,11 @@ public class IgniteImpl implements Ignite {
         return disasterRecoveryManager;
     }
 
+    @TestOnly
+    public ReplicaService replicaService() {
+        return replicaSvc;
+    }
+
     @TestOnly
     public VaultManager vault() {
         return vaultMgr;
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/DefaultMvTableStorageFactory.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/DefaultMvTableStorageFactory.java
new file mode 100644
index 00000000000..dcec93f0e61
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/DefaultMvTableStorageFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
+import org.apache.ignite.internal.storage.DataStorageManager;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
+import org.apache.ignite.internal.table.distributed.storage.NullStorageEngine;
+
+/**
+ * Default implementation of {@link MvTableStorageFactory} that creates table 
storages via {@link DataStorageManager}.
+ */
+public final class DefaultMvTableStorageFactory implements 
MvTableStorageFactory {
+    private final DataStorageManager dataStorageMgr;
+
+    private final CatalogService catalogService;
+
+    private final LowWatermark lowWatermark;
+
+    /**
+     * Constructor.
+     *
+     * @param dataStorageMgr Data storage manager.
+     * @param catalogService Catalog service.
+     * @param lowWatermark Low watermark.
+     */
+    public DefaultMvTableStorageFactory(
+            DataStorageManager dataStorageMgr,
+            CatalogService catalogService,
+            LowWatermark lowWatermark
+    ) {
+        this.dataStorageMgr = dataStorageMgr;
+        this.catalogService = catalogService;
+        this.lowWatermark = lowWatermark;
+    }
+
+    @Override
+    public MvTableStorage createMvTableStorage(CatalogTableDescriptor 
tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
+        StorageEngine engine = 
dataStorageMgr.engineByStorageProfile(tableDescriptor.storageProfile());
+
+        if (engine == null) {
+            // Create a placeholder to allow Table object being created.
+            engine = new NullStorageEngine();
+        }
+
+        return engine.createMvTable(
+                new StorageTableDescriptor(tableDescriptor.id(), 
zoneDescriptor.partitions(), tableDescriptor.storageProfile()),
+                new CatalogStorageIndexDescriptorSupplier(catalogService, 
lowWatermark)
+        );
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/MvTableStorageFactory.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/MvTableStorageFactory.java
new file mode 100644
index 00000000000..2e2661928c9
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/MvTableStorageFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+
+/**
+ * Factory for creating {@link MvTableStorage} instances.
+ */
+public interface MvTableStorageFactory {
+    /**
+     * Creates a new MV table storage for the given table and zone descriptors.
+     *
+     * @param tableDescriptor Catalog table descriptor.
+     * @param zoneDescriptor Catalog zone descriptor.
+     * @return New MV table storage.
+     */
+    MvTableStorage createMvTableStorage(CatalogTableDescriptor 
tableDescriptor, CatalogZoneDescriptor zoneDescriptor);
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StreamerFlushExecutorFactory.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StreamerFlushExecutorFactory.java
new file mode 100644
index 00000000000..f8827c50540
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StreamerFlushExecutorFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Lazily creates and manages the lifecycle of a single streamer flush {@link 
ScheduledExecutorService}.
+ *
+ * <p>The executor is created on first call to {@link #get()} and shut down on 
{@link #stop}.
+ * {@link #beforeStop()} must be called before {@link #stop} to prevent new 
executor creation during shutdown.
+ */
+public final class StreamerFlushExecutorFactory {
+    private static final IgniteLogger LOG = 
Loggers.forClass(StreamerFlushExecutorFactory.class);
+
+    private final InternalClusterNode localNode;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    @Nullable
+    private ScheduledExecutorService executor;
+
+    /**
+     * Constructor.
+     *
+     * @param localNode Local node, used for thread naming.
+     */
+    public StreamerFlushExecutorFactory(InternalClusterNode localNode) {
+        this.localNode = localNode;
+    }
+
+    /**
+     * Returns the executor, creating it lazily if needed.
+     *
+     * @return Streamer flush executor.
+     * @throws IgniteException If the node is stopping.
+     */
+    public synchronized ScheduledExecutorService get() {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+        }
+
+        try {
+            if (executor == null) {
+                executor = Executors.newSingleThreadScheduledExecutor(
+                        IgniteThreadFactory.create(localNode.name(), 
"streamer-flush-executor", LOG, STORAGE_WRITE));
+            }
+
+            return executor;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Blocks the busy lock to prevent new executor creation during shutdown.
+     */
+    public void beforeStop() {
+        busyLock.block();
+    }
+
+    /**
+     * Shuts down the executor if it was created.
+     *
+     * @param timeoutSeconds Timeout in seconds to wait for termination.
+     */
+    public void stop(int timeoutSeconds) {
+        ScheduledExecutorService local;
+
+        synchronized (this) {
+            local = this.executor;
+        }
+
+        shutdownAndAwaitTermination(local, timeoutSeconds, TimeUnit.SECONDS);
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableImplFactory.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableImplFactory.java
new file mode 100644
index 00000000000..f75d3fae789
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableImplFactory.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.Objects;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableProperties;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.storage.DataStorageManager;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
+import 
org.apache.ignite.internal.storage.metrics.StorageEngineTablesMetricSource;
+import org.apache.ignite.internal.table.StreamerReceiverRunner;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
+import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.table.QualifiedName;
+
+/**
+ * Factory for creating {@link TableImpl} instances.
+ *
+ * <p>Encapsulates all dependencies needed to create {@link InternalTableImpl} 
and {@link TableImpl}.
+ */
+public final class TableImplFactory {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TableImplFactory.class);
+
+    private final TopologyService topologyService;
+
+    private final LockManager lockMgr;
+
+    private final ReplicaService replicaSvc;
+
+    private final TxManager txManager;
+
+    private final ClockService clockService;
+
+    private final HybridTimestampTracker observableTimestampTracker;
+
+    private final PlacementDriver executorInclinedPlacementDriver;
+
+    private final TransactionInflights transactionInflights;
+
+    private final SchemaVersions schemaVersions;
+
+    private final ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
+
+    private final Supplier<IgniteSql> sql;
+
+    private final FailureProcessor failureProcessor;
+
+    private final MvTableStorageFactory mvTableStorageFactory;
+
+    private final CatalogService catalogService;
+
+    private final DataStorageManager dataStorageMgr;
+
+    private final MetricManager metricManager;
+
+    private final StreamerFlushExecutorFactory streamerFlushExecutorFactory;
+
+    private volatile StreamerReceiverRunner streamerReceiverRunner;
+
+    /**
+     * Creates a new factory.
+     *
+     * @param topologyService Topology service.
+     * @param lockMgr Lock manager.
+     * @param replicaSvc Replica service.
+     * @param txManager Transaction manager.
+     * @param clockService Clock service.
+     * @param observableTimestampTracker Observable timestamp tracker.
+     * @param executorInclinedPlacementDriver Placement driver.
+     * @param transactionInflights Transaction inflights.
+     * @param schemaVersions Schema versions.
+     * @param sql Supplier for {@link IgniteSql}.
+     * @param failureProcessor Failure processor.
+     * @param mvTableStorageFactory Factory for creating MV table storages.
+     * @param catalogService Catalog service.
+     * @param dataStorageMgr Data storage manager.
+     * @param metricManager Metric manager.
+     * @param streamerFlushExecutorFactory Factory for the streamer flush 
executor.
+     */
+    public TableImplFactory(
+            TopologyService topologyService,
+            LockManager lockMgr,
+            ReplicaService replicaSvc,
+            TxManager txManager,
+            ClockService clockService,
+            HybridTimestampTracker observableTimestampTracker,
+            PlacementDriver executorInclinedPlacementDriver,
+            TransactionInflights transactionInflights,
+            SchemaVersions schemaVersions,
+            Supplier<IgniteSql> sql,
+            FailureProcessor failureProcessor,
+            MvTableStorageFactory mvTableStorageFactory,
+            CatalogService catalogService,
+            DataStorageManager dataStorageMgr,
+            MetricManager metricManager,
+            StreamerFlushExecutorFactory streamerFlushExecutorFactory
+    ) {
+        this.topologyService = topologyService;
+        this.lockMgr = lockMgr;
+        this.replicaSvc = replicaSvc;
+        this.txManager = txManager;
+        this.clockService = clockService;
+        this.observableTimestampTracker = observableTimestampTracker;
+        this.executorInclinedPlacementDriver = executorInclinedPlacementDriver;
+        this.transactionInflights = transactionInflights;
+        this.schemaVersions = schemaVersions;
+        this.sql = sql;
+        this.failureProcessor = failureProcessor;
+        this.mvTableStorageFactory = mvTableStorageFactory;
+        this.catalogService = catalogService;
+        this.dataStorageMgr = dataStorageMgr;
+        this.metricManager = metricManager;
+        this.streamerFlushExecutorFactory = streamerFlushExecutorFactory;
+    }
+
+    /**
+     * Sets the streamer receiver runner. Must be called before any table 
creation.
+     *
+     * @param runner Streamer receiver runner.
+     */
+    public void setStreamerReceiverRunner(StreamerReceiverRunner runner) {
+        this.streamerReceiverRunner = runner;
+    }
+
+    /**
+     * Creates a {@link TableImpl} wrapping a new {@link InternalTableImpl}.
+     *
+     * @param tableName Qualified table name.
+     * @param tableDescriptor Catalog table descriptor.
+     * @param zoneDescriptor Catalog zone descriptor.
+     * @param schemaRegistry Schema registry.
+     * @return New table instance.
+     */
+    public TableImpl createTableImpl(
+            QualifiedName tableName,
+            CatalogTableDescriptor tableDescriptor,
+            CatalogZoneDescriptor zoneDescriptor,
+            SchemaRegistry schemaRegistry
+    ) {
+        MvTableStorage tableStorage = 
mvTableStorageFactory.createMvTableStorage(tableDescriptor, zoneDescriptor);
+
+        int partitions = zoneDescriptor.partitions();
+
+        ReadWriteMetricSource metricsSource = 
createAndRegisterMetricsSource(tableStorage, tableName);
+
+        InternalTableImpl internalTable = new InternalTableImpl(
+                tableName,
+                zoneDescriptor.id(),
+                tableDescriptor.id(),
+                partitions,
+                topologyService,
+                txManager,
+                tableStorage,
+                replicaSvc,
+                clockService,
+                observableTimestampTracker,
+                executorInclinedPlacementDriver,
+                transactionInflights,
+                streamerFlushExecutorFactory::get,
+                Objects.requireNonNull(streamerReceiverRunner),
+                metricsSource
+        );
+
+        CatalogTableProperties descProps = tableDescriptor.properties();
+
+        return new TableImpl(
+                internalTable,
+                lockMgr,
+                schemaVersions,
+                marshallers,
+                sql.get(),
+                failureProcessor,
+                tableDescriptor.primaryKeyIndexId(),
+                new 
TableStatsStalenessConfiguration(descProps.staleRowsFraction(), 
descProps.minStaleRowsCount()),
+                schemaRegistry
+        );
+    }
+
+    private ReadWriteMetricSource 
createAndRegisterMetricsSource(MvTableStorage tableStorage, QualifiedName 
tableName) {
+        StorageTableDescriptor tableDescriptor = 
tableStorage.getTableDescriptor();
+
+        CatalogTableDescriptor catalogTableDescriptor = 
catalogService.latestCatalog().table(tableDescriptor.getId());
+
+        // The table might be created during the recovery phase.
+        // In that case, we should only register the metric source for the 
actual tables that exist in the latest catalog.
+        boolean registrationNeeded = catalogTableDescriptor != null;
+
+        StorageEngine engine = 
dataStorageMgr.engineByStorageProfile(tableDescriptor.getStorageProfile());
+
+        // Engine can be null when the storage profile is not available.
+        if (engine != null && registrationNeeded) {
+            StorageEngineTablesMetricSource engineMetricSource = new 
StorageEngineTablesMetricSource(engine.name(), tableName);
+
+            engine.addTableMetrics(tableDescriptor, engineMetricSource);
+
+            try {
+                metricManager.registerSource(engineMetricSource);
+                metricManager.enable(engineMetricSource);
+            } catch (Exception e) {
+                String message = "Failed to register storage engine metrics 
source for table [id={}, name={}].";
+                LOG.warn(message, e, tableDescriptor.getId(), tableName);
+            }
+        }
+
+        ReadWriteMetricSource source = new TableMetricSource(tableName);
+
+        if (registrationNeeded) {
+            try {
+                metricManager.registerSource(source);
+                metricManager.enable(source);
+            } catch (Exception e) {
+                LOG.warn("Failed to register metrics source for table [id={}, 
name={}].", e, tableDescriptor.getId(), tableName);
+            }
+        }
+
+        return source;
+    }
+
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index d43c5040fac..81bf849dab2 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -27,7 +27,6 @@ import static 
org.apache.ignite.internal.causality.IncrementalVersionedValue.dep
 import static org.apache.ignite.internal.event.EventListener.fromConsumer;
 import static 
org.apache.ignite.internal.table.distributed.TableUtils.aliveTables;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
-import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
 import static org.apache.ignite.internal.util.CollectionUtils.difference;
 import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
 import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
@@ -46,13 +45,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
@@ -60,7 +57,6 @@ import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogTableProperties;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import 
org.apache.ignite.internal.catalog.events.AlterTablePropertiesEventParameters;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
@@ -89,7 +85,6 @@ import 
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventPara
 import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metrics.MetricManager;
@@ -109,9 +104,7 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.SchemaSyncService;
 import org.apache.ignite.internal.schema.configuration.GcConfiguration;
 import org.apache.ignite.internal.storage.DataStorageManager;
-import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageEngine;
-import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
 import 
org.apache.ignite.internal.storage.metrics.StorageEngineTablesMetricSource;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.InternalTable;
@@ -124,9 +117,6 @@ import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeColl
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
-import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import org.apache.ignite.internal.table.distributed.storage.NullStorageEngine;
-import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
 import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.tx.LockManager;
@@ -152,16 +142,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     /** The logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(TableManager.class);
 
-    private final InternalClusterNode localNode;
-
-    private final TopologyService topologyService;
-
-    /** Lock manager. */
-    private final LockManager lockMgr;
-
-    /** Replica service. */
-    private final ReplicaService replicaSvc;
-
     /** Transaction manager. */
     private final TxManager txManager;
 
@@ -228,34 +208,13 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
     private final LowWatermark lowWatermark;
 
-    private final HybridTimestampTracker observableTimestampTracker;
-
-    /** Placement driver. */
-    private final PlacementDriver executorInclinedPlacementDriver;
-
-    /** A supplier function that returns {@link IgniteSql}. */
-    private final Supplier<IgniteSql> sql;
-
-    private final SchemaVersions schemaVersions;
-
     private final PartitionReplicatorNodeRecovery 
partitionReplicatorNodeRecovery;
 
     /** Ends at the {@link IgniteComponent#stopAsync(ComponentContext)} with 
an {@link NodeStoppingException}. */
     private final CompletableFuture<Void> stopManagerFuture = new 
CompletableFuture<>();
 
-    /** Marshallers provider. */
-    private final ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
-
-    private final TransactionInflights transactionInflights;
-
     private final PartitionReplicaLifecycleManager 
partitionReplicaLifecycleManager;
 
-    @Nullable
-    private ScheduledExecutorService streamerFlushExecutor;
-
-    @Nullable
-    private StreamerReceiverRunner streamerReceiverRunner;
-
     private final CompletableFuture<Void> readyToProcessReplicaStarts = new 
CompletableFuture<>();
 
     /** Configuration of rebalance retries delay. */
@@ -269,6 +228,10 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
     private final MetricManager metricManager;
 
+    private final StreamerFlushExecutorFactory streamerFlushExecutorFactory;
+
+    private final TableImplFactory tableImplFactory;
+
     private final TablePartitionResourcesFactory partitionResourcesFactory;
 
     private final TableZoneCoordinator zoneCoordinator;
@@ -299,6 +262,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
      * @param minTimeCollectorService Collects minimum required timestamp for 
each partition.
      * @param systemDistributedConfiguration System distributed configuration.
      * @param metricManager Metric manager.
+     * @param mvTableStorageFactory Factory for creating MV table storages.
      */
     public TableManager(
             InternalClusterNode localNode,
@@ -332,12 +296,9 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
             MinimumRequiredTimeCollectorService minTimeCollectorService,
             SystemDistributedConfiguration systemDistributedConfiguration,
             MetricManager metricManager,
-            PartitionModificationCounterFactory 
partitionModificationCounterFactory
+            PartitionModificationCounterFactory 
partitionModificationCounterFactory,
+            MvTableStorageFactory mvTableStorageFactory
     ) {
-        this.localNode = localNode;
-        this.topologyService = topologyService;
-        this.lockMgr = lockMgr;
-        this.replicaSvc = replicaSvc;
         this.txManager = txManager;
         this.dataStorageMgr = dataStorageMgr;
         this.metaStorageMgr = metaStorageMgr;
@@ -346,17 +307,14 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         this.clockService = clockService;
         this.catalogService = catalogService;
         this.failureProcessor = failureProcessor;
-        this.observableTimestampTracker = observableTimestampTracker;
-        this.sql = sql;
         this.lowWatermark = lowWatermark;
-        this.transactionInflights = transactionInflights;
         this.partitionReplicaLifecycleManager = 
partitionReplicaLifecycleManager;
         this.metricManager = metricManager;
 
         this.executorInclinedSchemaSyncService = new 
ExecutorInclinedSchemaSyncService(schemaSyncService, 
partitionOperationsExecutor);
-        this.executorInclinedPlacementDriver = new 
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
+        PlacementDriver executorInclinedPlacementDriver = new 
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
 
-        schemaVersions = new 
SchemaVersionsImpl(executorInclinedSchemaSyncService, catalogService, 
clockService);
+        SchemaVersions schemaVersions = new 
SchemaVersionsImpl(executorInclinedSchemaSyncService, catalogService, 
clockService);
 
         tablesVv = new IncrementalVersionedValue<>("TableManager#tables", 
registry, 100, null);
 
@@ -389,7 +347,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                 lowWatermark,
                 validationSchemasSource,
                 this.executorInclinedSchemaSyncService,
-                this.executorInclinedPlacementDriver,
+                executorInclinedPlacementDriver,
                 topologyService,
                 localNode,
                 remotelyTriggeredResourceRegistry,
@@ -403,6 +361,27 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                 fullStateTransferIndexChooser
         );
 
+        streamerFlushExecutorFactory = new 
StreamerFlushExecutorFactory(localNode);
+
+        tableImplFactory = new TableImplFactory(
+                topologyService,
+                lockMgr,
+                replicaSvc,
+                txManager,
+                clockService,
+                observableTimestampTracker,
+                executorInclinedPlacementDriver,
+                transactionInflights,
+                schemaVersions,
+                sql,
+                failureProcessor,
+                mvTableStorageFactory,
+                catalogService,
+                dataStorageMgr,
+                metricManager,
+                streamerFlushExecutorFactory
+        );
+
         rebalanceRetryDelayConfiguration = new 
SystemDistributedConfigurationPropertyHolder<>(
                 systemDistributedConfiguration,
                 (v, r) -> {},
@@ -625,6 +604,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
         stopManagerFuture.completeExceptionally(new NodeStoppingException());
 
+        streamerFlushExecutorFactory.beforeStop();
         busyLock.block();
 
         lowWatermark.removeListener(LowWatermarkEvent.LOW_WATERMARK_CHANGED, 
onLowWatermarkChangedListener);
@@ -655,15 +635,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                     zoneCoordinator::stop,
                     () -> 
closeAllManually(tableRegistry.tables().values().stream().map(table -> () -> 
closeTable(table))),
                     () -> shutdownAndAwaitTermination(scanRequestExecutor, 
shutdownTimeoutSeconds, TimeUnit.SECONDS),
-                    () -> {
-                        ScheduledExecutorService streamerFlushExecutor;
-
-                        synchronized (this) {
-                            streamerFlushExecutor = this.streamerFlushExecutor;
-                        }
-
-                        shutdownAndAwaitTermination(streamerFlushExecutor, 
shutdownTimeoutSeconds, TimeUnit.SECONDS);
-                    }
+                    () -> 
streamerFlushExecutorFactory.stop(shutdownTimeoutSeconds)
             );
         } catch (Exception e) {
             return failedFuture(e);
@@ -702,63 +674,14 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         LOG.trace("Creating local table: name={}, id={}, token={}", 
tableName.toCanonicalForm(), tableDescriptor.id(), causalityToken);
 
-        MvTableStorage tableStorage = createTableStorage(tableDescriptor, 
zoneDescriptor);
-
-        int partitions = zoneDescriptor.partitions();
-
-        InternalTableImpl internalTable = new InternalTableImpl(
+        return tableImplFactory.createTableImpl(
                 tableName,
-                zoneDescriptor.id(),
-                tableDescriptor.id(),
-                partitions,
-                topologyService,
-                txManager,
-                tableStorage,
-                replicaSvc,
-                clockService,
-                observableTimestampTracker,
-                executorInclinedPlacementDriver,
-                transactionInflights,
-                this::streamerFlushExecutor,
-                Objects.requireNonNull(streamerReceiverRunner),
-                createAndRegisterMetricsSource(tableStorage, tableName)
-        );
-
-        CatalogTableProperties descProps = tableDescriptor.properties();
-
-        return new TableImpl(
-                internalTable,
-                lockMgr,
-                schemaVersions,
-                marshallers,
-                sql.get(),
-                failureProcessor,
-                tableDescriptor.primaryKeyIndexId(),
-                new 
TableStatsStalenessConfiguration(descProps.staleRowsFraction(), 
descProps.minStaleRowsCount()),
+                tableDescriptor,
+                zoneDescriptor,
                 schemaRegistry
         );
     }
 
-    /**
-     * Creates data storage for the provided table.
-     *
-     * @param tableDescriptor Catalog table descriptor.
-     * @param zoneDescriptor Catalog distributed zone descriptor.
-     */
-    protected MvTableStorage createTableStorage(CatalogTableDescriptor 
tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
-        StorageEngine engine = 
dataStorageMgr.engineByStorageProfile(tableDescriptor.storageProfile());
-
-        if (engine == null) {
-            // Create a placeholder to allow Table object being created.
-            engine = new NullStorageEngine();
-        }
-
-        return engine.createMvTable(
-                new StorageTableDescriptor(tableDescriptor.id(), 
zoneDescriptor.partitions(), tableDescriptor.storageProfile()),
-                new CatalogStorageIndexDescriptorSupplier(catalogService, 
lowWatermark)
-        );
-    }
-
     /**
      * Drops local structures for a table.
      *
@@ -1111,11 +1034,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return anyOf(future, stopManagerFuture).thenApply(o -> (T) o);
     }
 
-    @TestOnly
-    public ReplicaService replicaService() {
-        return replicaSvc;
-    }
-
     /** Internal event. */
     private static class DestroyTableEvent {
         final int catalogVersion;
@@ -1154,26 +1072,9 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         }
     }
 
-    private synchronized ScheduledExecutorService streamerFlushExecutor() {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteException(NODE_STOPPING_ERR, new 
NodeStoppingException());
-        }
-
-        try {
-            if (streamerFlushExecutor == null) {
-                streamerFlushExecutor = 
Executors.newSingleThreadScheduledExecutor(
-                        IgniteThreadFactory.create(localNode.name(), 
"streamer-flush-executor", LOG, STORAGE_WRITE));
-            }
-
-            return streamerFlushExecutor;
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
     @Override
     public void setStreamerReceiverRunner(StreamerReceiverRunner runner) {
-        this.streamerReceiverRunner = runner;
+        tableImplFactory.setStreamerReceiverRunner(runner);
     }
 
     /**
@@ -1187,46 +1088,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return zoneCoordinator.zoneTables(zoneId);
     }
 
-    private ReadWriteMetricSource 
createAndRegisterMetricsSource(MvTableStorage tableStorage, QualifiedName 
tableName) {
-        StorageTableDescriptor tableDescriptor = 
tableStorage.getTableDescriptor();
-
-        CatalogTableDescriptor catalogTableDescriptor = 
catalogService.latestCatalog().table(tableDescriptor.getId());
-
-        // The table might be created during the recovery phase.
-        // In that case, we should only register the metric source for the 
actual tables that exist in the latest catalog.
-        boolean registrationNeeded = catalogTableDescriptor != null;
-
-        StorageEngine engine = 
dataStorageMgr.engineByStorageProfile(tableDescriptor.getStorageProfile());
-
-        // Engine can be null sometimes, see "TableManager.createTableStorage".
-        if (engine != null && registrationNeeded) {
-            StorageEngineTablesMetricSource engineMetricSource = new 
StorageEngineTablesMetricSource(engine.name(), tableName);
-
-            engine.addTableMetrics(tableDescriptor, engineMetricSource);
-
-            try {
-                metricManager.registerSource(engineMetricSource);
-                metricManager.enable(engineMetricSource);
-            } catch (Exception e) {
-                String message = "Failed to register storage engine metrics 
source for table [id={}, name={}].";
-                LOG.warn(message, e, tableDescriptor.getId(), tableName);
-            }
-        }
-
-        ReadWriteMetricSource source = new TableMetricSource(tableName);
-
-        if (registrationNeeded) {
-            try {
-                metricManager.registerSource(source);
-                metricManager.enable(source);
-            } catch (Exception e) {
-                LOG.warn("Failed to register metrics source for table [id={}, 
name={}].", e, tableDescriptor.getId(), tableName);
-            }
-        }
-
-        return source;
-    }
-
     private void unregisterMetricsSource(TableViewInternal table) {
         if (table == null) {
             return;
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 8e3dfaf5b74..373e2c35d2e 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -74,11 +74,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
-import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.components.LogSyncer;
 import org.apache.ignite.internal.components.LongJvmPauseDetector;
@@ -400,6 +399,17 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
         resetPeersCallCount.updateAndGet(existing -> Math.max(existing, 
resetSequenceToken));
     }
 
+    private static MvTableStorageFactory spyingStorageFactory(
+            MvTableStorageFactory delegate,
+            Consumer<MvTableStorage> decorator
+    ) {
+        return (tableDesc, zoneDesc) -> {
+            MvTableStorage storage = 
spy(delegate.createMvTableStorage(tableDesc, zoneDesc));
+            decorator.accept(storage);
+            return storage;
+        };
+    }
+
     /**
      * Creates and starts TableManage and dependencies.
      */
@@ -593,16 +603,11 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 minTimeCollectorService,
                 systemDistributedConfiguration,
                 metricManager,
-                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
-        ) {
-
-            @Override
-            protected MvTableStorage createTableStorage(CatalogTableDescriptor 
tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
-                mvTableStorage = spy(super.createTableStorage(tableDescriptor, 
zoneDescriptor));
-
-                return mvTableStorage;
-            }
-        };
+                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY,
+                spyingStorageFactory(new DefaultMvTableStorageFactory(dsm, 
catalogManager, lowWatermark), storage -> {
+                    mvTableStorage = storage;
+                })
+        );
 
         
tableManager.setStreamerReceiverRunner(mock(StreamerReceiverRunner.class));
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 5c98a208f65..dcaa8141136 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -64,8 +64,6 @@ import java.util.function.Consumer;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogTestUtils;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
-import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.causality.RevisionListenerRegistry;
 import org.apache.ignite.internal.components.LogSyncer;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
@@ -536,6 +534,17 @@ public class TableManagerTest extends IgniteAbstractTest {
         return tbl2;
     }
 
+    private static MvTableStorageFactory spyingStorageFactory(
+            MvTableStorageFactory delegate,
+            Consumer<MvTableStorage> decorator
+    ) {
+        return (tableDesc, zoneDesc) -> {
+            MvTableStorage storage = 
spy(delegate.createMvTableStorage(tableDesc, zoneDesc));
+            decorator.accept(storage);
+            return storage;
+        };
+    }
+
     private TableManager createTableManager(CompletableFuture<TableManager> 
tblManagerFut) {
         return createTableManager(tblManagerFut, unused -> {});
     }
@@ -599,18 +608,13 @@ public class TableManagerTest extends IgniteAbstractTest {
                 new MinimumRequiredTimeCollectorServiceImpl(),
                 systemDistributedConfiguration,
                 new NoOpMetricManager(),
-                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
+                TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY,
+                spyingStorageFactory(new DefaultMvTableStorageFactory(dsm, 
catalogManager, lowWatermark), storage -> {
+                    mvTableStorage = storage;
+                    tableStorageDecorator.accept(storage);
+                })
         ) {
 
-            @Override
-            protected MvTableStorage createTableStorage(CatalogTableDescriptor 
tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
-                mvTableStorage = spy(super.createTableStorage(tableDescriptor, 
zoneDescriptor));
-
-                tableStorageDecorator.accept(mvTableStorage);
-
-                return mvTableStorage;
-            }
-
             @Override
             public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
                 return sharedTxStateStorage.startAsync(componentContext)
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxWriteIntentResolutionTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxWriteIntentResolutionTest.java
index a910050222b..fa55d8a2353 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxWriteIntentResolutionTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxWriteIntentResolutionTest.java
@@ -316,7 +316,7 @@ public class ItTxWriteIntentResolutionTest extends 
ClusterPerClassIntegrationTes
             UUID roTxId = txId(roTx);
 
             ReadOnlySingleRowPkReplicaRequest req = roGetRequest(coordinator, 
wiGroupId, tableId, keyTuple, roTxId, readTs);
-            CompletableFuture<?> reqFut = 
wiBackupNode.distributedTableManager().replicaService().invoke(wiBackupNode.name(),
 req);
+            CompletableFuture<?> reqFut = 
wiBackupNode.replicaService().invoke(wiBackupNode.name(), req);
 
             if (expected.get1() != null) {
                 assertThat(reqFut, willThrow(expected.get1()));

Reply via email to