This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ebedbb70704 IGNITE-28096 Extract TableImplFactory from TableManager
(#7898)
ebedbb70704 is described below
commit ebedbb707047eef521bee14d6cb0994bd7d36595
Author: Viacheslav Blinov <[email protected]>
AuthorDate: Mon Apr 6 15:22:28 2026 +0300
IGNITE-28096 Extract TableImplFactory from TableManager (#7898)
---
.../rebalance/ItRebalanceDistributedTest.java | 4 +-
.../partition/replicator/fixtures/Node.java | 53 ++---
.../runner/app/ItIgniteNodeRestartTest.java | 4 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 14 +-
.../distributed/DefaultMvTableStorageFactory.java | 71 ++++++
.../table/distributed/MvTableStorageFactory.java | 36 +++
.../distributed/StreamerFlushExecutorFactory.java | 105 +++++++++
.../table/distributed/TableImplFactory.java | 257 +++++++++++++++++++++
.../internal/table/distributed/TableManager.java | 213 +++--------------
.../distributed/TableManagerRecoveryTest.java | 29 ++-
.../table/distributed/TableManagerTest.java | 28 ++-
.../distributed/ItTxWriteIntentResolutionTest.java | 2 +-
12 files changed, 585 insertions(+), 231 deletions(-)
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index abbcb4e377a..02684f96d6d 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -232,6 +232,7 @@ import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.TableViewInternal;
+import
org.apache.ignite.internal.table.distributed.DefaultMvTableStorageFactory;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
@@ -1595,7 +1596,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
minTimeCollectorService,
systemDistributedConfiguration,
metricManager,
- TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY,
+ new DefaultMvTableStorageFactory(dataStorageMgr,
catalogManager, lowWatermark)
);
tableManager.setStreamerReceiverRunner(mock(StreamerReceiverRunner.class));
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 529af9b7eb5..d61f25985ab 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -57,8 +57,6 @@ import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.CatalogTestUtils;
import org.apache.ignite.internal.catalog.compaction.CatalogCompactionRunner;
-import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.cluster.management.ClusterIdHolder;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
@@ -169,6 +167,8 @@ import
org.apache.ignite.internal.systemview.SystemViewManagerImpl;
import org.apache.ignite.internal.systemview.api.SystemViewManager;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.TableTestUtils;
+import
org.apache.ignite.internal.table.distributed.DefaultMvTableStorageFactory;
+import org.apache.ignite.internal.table.distributed.MvTableStorageFactory;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
@@ -800,30 +800,9 @@ public class Node {
minTimeCollectorService,
systemDistributedConfiguration,
metricManager,
- TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
- ) {
-
- @Override
- protected MvTableStorage createTableStorage(CatalogTableDescriptor
tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
- MvTableStorage storage =
createSpy(super.createTableStorage(tableDescriptor, zoneDescriptor));
-
- var partitionStorages = new ConcurrentHashMap<Integer,
MvPartitionStorage>();
-
- doAnswer(invocation -> {
- Integer partitionId = invocation.getArgument(0);
-
- return partitionStorages.computeIfAbsent(partitionId, id
-> {
- try {
- return (MvPartitionStorage)
createSpy(invocation.callRealMethod());
- } catch (Throwable e) {
- throw new RuntimeException(e);
- }
- });
- }).when(storage).getMvPartition(anyInt());
-
- return storage;
- }
- };
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY,
+ spyingMvTableStorageFactory(new
DefaultMvTableStorageFactory(dataStorageMgr, catalogManager, lowWatermark))
+ );
tableManager.setStreamerReceiverRunner(mock(StreamerReceiverRunner.class));
@@ -1050,6 +1029,28 @@ public class Node {
return primaryReplicaFuture.join();
}
+ private static MvTableStorageFactory
spyingMvTableStorageFactory(MvTableStorageFactory delegate) {
+ return (tableDesc, zoneDesc) -> {
+ MvTableStorage storage =
createSpy(delegate.createMvTableStorage(tableDesc, zoneDesc));
+
+ var partitionStorages = new ConcurrentHashMap<Integer,
MvPartitionStorage>();
+
+ doAnswer(invocation -> {
+ Integer partitionId = invocation.getArgument(0);
+
+ return partitionStorages.computeIfAbsent(partitionId, id -> {
+ try {
+ return (MvPartitionStorage)
createSpy(invocation.callRealMethod());
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }).when(storage).getMvPartition(anyInt());
+
+ return storage;
+ };
+ }
+
@Contract("null -> null")
private static <T> @Nullable T createSpy(@Nullable T object) {
if (object == null) {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 77684332f2c..c470b932a80 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -212,6 +212,7 @@ import
org.apache.ignite.internal.table.RecordBinaryViewImpl;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.TableViewInternal;
+import
org.apache.ignite.internal.table.distributed.DefaultMvTableStorageFactory;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
@@ -818,7 +819,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
minTimeCollectorService,
systemDistributedConfiguration,
metricManager,
- TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY,
+ new DefaultMvTableStorageFactory(dataStorageManager,
catalogManager, lowWatermark)
);
var indexManager = new IndexManager(
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 97f573b5986..41844c92b34 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -268,6 +268,7 @@ import
org.apache.ignite.internal.system.CpuInformationProvider;
import org.apache.ignite.internal.system.JvmCpuInformationProvider;
import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
import org.apache.ignite.internal.systemview.api.SystemViewManager;
+import
org.apache.ignite.internal.table.distributed.DefaultMvTableStorageFactory;
import
org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory;
import
org.apache.ignite.internal.table.distributed.PublicApiThreadingIgniteTables;
import org.apache.ignite.internal.table.distributed.TableManager;
@@ -393,6 +394,9 @@ public class IgniteImpl implements Ignite {
/** Replica manager. */
private final ReplicaManager replicaMgr;
+ /** Replica service. */
+ private final ReplicaService replicaSvc;
+
/** Transactions manager. */
private final TxManagerImpl txManager;
@@ -918,7 +922,7 @@ public class IgniteImpl implements Ignite {
TransactionConfiguration txConfig =
clusterConfigRegistry.getConfiguration(TransactionExtensionConfiguration.KEY).transaction();
- ReplicaService replicaSvc = new ReplicaService(
+ replicaSvc = new ReplicaService(
messagingServiceReturningToStorageOperationsPool,
clockService,
threadPoolsManager.partitionOperationsExecutor(),
@@ -1195,7 +1199,8 @@ public class IgniteImpl implements Ignite {
minTimeCollectorService,
systemDistributedConfiguration,
metricManager,
- partitionModificationCounterFactory
+ partitionModificationCounterFactory,
+ new DefaultMvTableStorageFactory(dataStorageMgr,
catalogManager, lowWatermark)
);
disasterRecoveryManager = new DisasterRecoveryManager(
@@ -1837,6 +1842,11 @@ public class IgniteImpl implements Ignite {
return disasterRecoveryManager;
}
+ @TestOnly
+ public ReplicaService replicaService() {
+ return replicaSvc;
+ }
+
@TestOnly
public VaultManager vault() {
return vaultMgr;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/DefaultMvTableStorageFactory.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/DefaultMvTableStorageFactory.java
new file mode 100644
index 00000000000..dcec93f0e61
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/DefaultMvTableStorageFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
+import org.apache.ignite.internal.storage.DataStorageManager;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
+import org.apache.ignite.internal.table.distributed.storage.NullStorageEngine;
+
+/**
+ * Default implementation of {@link MvTableStorageFactory} that creates table
storages via {@link DataStorageManager}.
+ */
+public final class DefaultMvTableStorageFactory implements
MvTableStorageFactory {
+ private final DataStorageManager dataStorageMgr;
+
+ private final CatalogService catalogService;
+
+ private final LowWatermark lowWatermark;
+
+ /**
+ * Constructor.
+ *
+ * @param dataStorageMgr Data storage manager.
+ * @param catalogService Catalog service.
+ * @param lowWatermark Low watermark.
+ */
+ public DefaultMvTableStorageFactory(
+ DataStorageManager dataStorageMgr,
+ CatalogService catalogService,
+ LowWatermark lowWatermark
+ ) {
+ this.dataStorageMgr = dataStorageMgr;
+ this.catalogService = catalogService;
+ this.lowWatermark = lowWatermark;
+ }
+
+ @Override
+ public MvTableStorage createMvTableStorage(CatalogTableDescriptor
tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
+ StorageEngine engine =
dataStorageMgr.engineByStorageProfile(tableDescriptor.storageProfile());
+
+ if (engine == null) {
+ // Create a placeholder to allow Table object being created.
+ engine = new NullStorageEngine();
+ }
+
+ return engine.createMvTable(
+ new StorageTableDescriptor(tableDescriptor.id(),
zoneDescriptor.partitions(), tableDescriptor.storageProfile()),
+ new CatalogStorageIndexDescriptorSupplier(catalogService,
lowWatermark)
+ );
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/MvTableStorageFactory.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/MvTableStorageFactory.java
new file mode 100644
index 00000000000..2e2661928c9
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/MvTableStorageFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+
+/**
+ * Factory for creating {@link MvTableStorage} instances.
+ */
+public interface MvTableStorageFactory {
+ /**
+ * Creates a new MV table storage for the given table and zone descriptors.
+ *
+ * @param tableDescriptor Catalog table descriptor.
+ * @param zoneDescriptor Catalog zone descriptor.
+ * @return New MV table storage.
+ */
+ MvTableStorage createMvTableStorage(CatalogTableDescriptor
tableDescriptor, CatalogZoneDescriptor zoneDescriptor);
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StreamerFlushExecutorFactory.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StreamerFlushExecutorFactory.java
new file mode 100644
index 00000000000..f8827c50540
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StreamerFlushExecutorFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
+import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Lazily creates and manages the lifecycle of a single streamer flush {@link
ScheduledExecutorService}.
+ *
+ * <p>The executor is created on first call to {@link #get()} and shut down on
{@link #stop}.
+ * {@link #beforeStop()} must be called before {@link #stop} to prevent new
executor creation during shutdown.
+ */
+public final class StreamerFlushExecutorFactory {
+ private static final IgniteLogger LOG =
Loggers.forClass(StreamerFlushExecutorFactory.class);
+
+ private final InternalClusterNode localNode;
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ @Nullable
+ private ScheduledExecutorService executor;
+
+ /**
+ * Constructor.
+ *
+ * @param localNode Local node, used for thread naming.
+ */
+ public StreamerFlushExecutorFactory(InternalClusterNode localNode) {
+ this.localNode = localNode;
+ }
+
+ /**
+ * Returns the executor, creating it lazily if needed.
+ *
+ * @return Streamer flush executor.
+ * @throws IgniteException If the node is stopping.
+ */
+ public synchronized ScheduledExecutorService get() {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(NODE_STOPPING_ERR, new
NodeStoppingException());
+ }
+
+ try {
+ if (executor == null) {
+ executor = Executors.newSingleThreadScheduledExecutor(
+ IgniteThreadFactory.create(localNode.name(),
"streamer-flush-executor", LOG, STORAGE_WRITE));
+ }
+
+ return executor;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Blocks the busy lock to prevent new executor creation during shutdown.
+ */
+ public void beforeStop() {
+ busyLock.block();
+ }
+
+ /**
+ * Shuts down the executor if it was created.
+ *
+ * @param timeoutSeconds Timeout in seconds to wait for termination.
+ */
+ public void stop(int timeoutSeconds) {
+ ScheduledExecutorService local;
+
+ synchronized (this) {
+ local = this.executor;
+ }
+
+ shutdownAndAwaitTermination(local, timeoutSeconds, TimeUnit.SECONDS);
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableImplFactory.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableImplFactory.java
new file mode 100644
index 00000000000..f75d3fae789
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableImplFactory.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.Objects;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableProperties;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.storage.DataStorageManager;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
+import
org.apache.ignite.internal.storage.metrics.StorageEngineTablesMetricSource;
+import org.apache.ignite.internal.table.StreamerReceiverRunner;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
+import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.table.QualifiedName;
+
+/**
+ * Factory for creating {@link TableImpl} instances.
+ *
+ * <p>Encapsulates all dependencies needed to create {@link InternalTableImpl}
and {@link TableImpl}.
+ */
+public final class TableImplFactory {
+ private static final IgniteLogger LOG =
Loggers.forClass(TableImplFactory.class);
+
+ private final TopologyService topologyService;
+
+ private final LockManager lockMgr;
+
+ private final ReplicaService replicaSvc;
+
+ private final TxManager txManager;
+
+ private final ClockService clockService;
+
+ private final HybridTimestampTracker observableTimestampTracker;
+
+ private final PlacementDriver executorInclinedPlacementDriver;
+
+ private final TransactionInflights transactionInflights;
+
+ private final SchemaVersions schemaVersions;
+
+ private final ReflectionMarshallersProvider marshallers = new
ReflectionMarshallersProvider();
+
+ private final Supplier<IgniteSql> sql;
+
+ private final FailureProcessor failureProcessor;
+
+ private final MvTableStorageFactory mvTableStorageFactory;
+
+ private final CatalogService catalogService;
+
+ private final DataStorageManager dataStorageMgr;
+
+ private final MetricManager metricManager;
+
+ private final StreamerFlushExecutorFactory streamerFlushExecutorFactory;
+
+ private volatile StreamerReceiverRunner streamerReceiverRunner;
+
+ /**
+ * Creates a new factory.
+ *
+ * @param topologyService Topology service.
+ * @param lockMgr Lock manager.
+ * @param replicaSvc Replica service.
+ * @param txManager Transaction manager.
+ * @param clockService Clock service.
+ * @param observableTimestampTracker Observable timestamp tracker.
+ * @param executorInclinedPlacementDriver Placement driver.
+ * @param transactionInflights Transaction inflights.
+ * @param schemaVersions Schema versions.
+ * @param sql Supplier for {@link IgniteSql}.
+ * @param failureProcessor Failure processor.
+ * @param mvTableStorageFactory Factory for creating MV table storages.
+ * @param catalogService Catalog service.
+ * @param dataStorageMgr Data storage manager.
+ * @param metricManager Metric manager.
+ * @param streamerFlushExecutorFactory Factory for the streamer flush
executor.
+ */
+ public TableImplFactory(
+ TopologyService topologyService,
+ LockManager lockMgr,
+ ReplicaService replicaSvc,
+ TxManager txManager,
+ ClockService clockService,
+ HybridTimestampTracker observableTimestampTracker,
+ PlacementDriver executorInclinedPlacementDriver,
+ TransactionInflights transactionInflights,
+ SchemaVersions schemaVersions,
+ Supplier<IgniteSql> sql,
+ FailureProcessor failureProcessor,
+ MvTableStorageFactory mvTableStorageFactory,
+ CatalogService catalogService,
+ DataStorageManager dataStorageMgr,
+ MetricManager metricManager,
+ StreamerFlushExecutorFactory streamerFlushExecutorFactory
+ ) {
+ this.topologyService = topologyService;
+ this.lockMgr = lockMgr;
+ this.replicaSvc = replicaSvc;
+ this.txManager = txManager;
+ this.clockService = clockService;
+ this.observableTimestampTracker = observableTimestampTracker;
+ this.executorInclinedPlacementDriver = executorInclinedPlacementDriver;
+ this.transactionInflights = transactionInflights;
+ this.schemaVersions = schemaVersions;
+ this.sql = sql;
+ this.failureProcessor = failureProcessor;
+ this.mvTableStorageFactory = mvTableStorageFactory;
+ this.catalogService = catalogService;
+ this.dataStorageMgr = dataStorageMgr;
+ this.metricManager = metricManager;
+ this.streamerFlushExecutorFactory = streamerFlushExecutorFactory;
+ }
+
+ /**
+ * Sets the streamer receiver runner. Must be called before any table
creation.
+ *
+ * @param runner Streamer receiver runner.
+ */
+ public void setStreamerReceiverRunner(StreamerReceiverRunner runner) {
+ this.streamerReceiverRunner = runner;
+ }
+
+ /**
+ * Creates a {@link TableImpl} wrapping a new {@link InternalTableImpl}.
+ *
+ * @param tableName Qualified table name.
+ * @param tableDescriptor Catalog table descriptor.
+ * @param zoneDescriptor Catalog zone descriptor.
+ * @param schemaRegistry Schema registry.
+ * @return New table instance.
+ */
+ public TableImpl createTableImpl(
+ QualifiedName tableName,
+ CatalogTableDescriptor tableDescriptor,
+ CatalogZoneDescriptor zoneDescriptor,
+ SchemaRegistry schemaRegistry
+ ) {
+ MvTableStorage tableStorage =
mvTableStorageFactory.createMvTableStorage(tableDescriptor, zoneDescriptor);
+
+ int partitions = zoneDescriptor.partitions();
+
+ ReadWriteMetricSource metricsSource =
createAndRegisterMetricsSource(tableStorage, tableName);
+
+ InternalTableImpl internalTable = new InternalTableImpl(
+ tableName,
+ zoneDescriptor.id(),
+ tableDescriptor.id(),
+ partitions,
+ topologyService,
+ txManager,
+ tableStorage,
+ replicaSvc,
+ clockService,
+ observableTimestampTracker,
+ executorInclinedPlacementDriver,
+ transactionInflights,
+ streamerFlushExecutorFactory::get,
+ Objects.requireNonNull(streamerReceiverRunner),
+ metricsSource
+ );
+
+ CatalogTableProperties descProps = tableDescriptor.properties();
+
+ return new TableImpl(
+ internalTable,
+ lockMgr,
+ schemaVersions,
+ marshallers,
+ sql.get(),
+ failureProcessor,
+ tableDescriptor.primaryKeyIndexId(),
+ new
TableStatsStalenessConfiguration(descProps.staleRowsFraction(),
descProps.minStaleRowsCount()),
+ schemaRegistry
+ );
+ }
+
+ private ReadWriteMetricSource
createAndRegisterMetricsSource(MvTableStorage tableStorage, QualifiedName
tableName) {
+ StorageTableDescriptor tableDescriptor =
tableStorage.getTableDescriptor();
+
+ CatalogTableDescriptor catalogTableDescriptor =
catalogService.latestCatalog().table(tableDescriptor.getId());
+
+ // The table might be created during the recovery phase.
+ // In that case, we should only register the metric source for the
actual tables that exist in the latest catalog.
+ boolean registrationNeeded = catalogTableDescriptor != null;
+
+ StorageEngine engine =
dataStorageMgr.engineByStorageProfile(tableDescriptor.getStorageProfile());
+
+ // Engine can be null when the storage profile is not available.
+ if (engine != null && registrationNeeded) {
+ StorageEngineTablesMetricSource engineMetricSource = new
StorageEngineTablesMetricSource(engine.name(), tableName);
+
+ engine.addTableMetrics(tableDescriptor, engineMetricSource);
+
+ try {
+ metricManager.registerSource(engineMetricSource);
+ metricManager.enable(engineMetricSource);
+ } catch (Exception e) {
+ String message = "Failed to register storage engine metrics
source for table [id={}, name={}].";
+ LOG.warn(message, e, tableDescriptor.getId(), tableName);
+ }
+ }
+
+ ReadWriteMetricSource source = new TableMetricSource(tableName);
+
+ if (registrationNeeded) {
+ try {
+ metricManager.registerSource(source);
+ metricManager.enable(source);
+ } catch (Exception e) {
+ LOG.warn("Failed to register metrics source for table [id={},
name={}].", e, tableDescriptor.getId(), tableName);
+ }
+ }
+
+ return source;
+ }
+
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index d43c5040fac..81bf849dab2 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -27,7 +27,6 @@ import static
org.apache.ignite.internal.causality.IncrementalVersionedValue.dep
import static org.apache.ignite.internal.event.EventListener.fromConsumer;
import static
org.apache.ignite.internal.table.distributed.TableUtils.aliveTables;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
-import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
import static org.apache.ignite.internal.util.CollectionUtils.difference;
import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
@@ -46,13 +45,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
@@ -60,7 +57,6 @@ import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogTableProperties;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import
org.apache.ignite.internal.catalog.events.AlterTablePropertiesEventParameters;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
@@ -89,7 +85,6 @@ import
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventPara
import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metrics.MetricManager;
@@ -109,9 +104,7 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
import org.apache.ignite.internal.storage.DataStorageManager;
-import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageEngine;
-import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
import
org.apache.ignite.internal.storage.metrics.StorageEngineTablesMetricSource;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.InternalTable;
@@ -124,9 +117,6 @@ import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeColl
import
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
-import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import org.apache.ignite.internal.table.distributed.storage.NullStorageEngine;
-import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.tx.LockManager;
@@ -152,16 +142,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
/** The logger. */
private static final IgniteLogger LOG =
Loggers.forClass(TableManager.class);
- private final InternalClusterNode localNode;
-
- private final TopologyService topologyService;
-
- /** Lock manager. */
- private final LockManager lockMgr;
-
- /** Replica service. */
- private final ReplicaService replicaSvc;
-
/** Transaction manager. */
private final TxManager txManager;
@@ -228,34 +208,13 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
private final LowWatermark lowWatermark;
- private final HybridTimestampTracker observableTimestampTracker;
-
- /** Placement driver. */
- private final PlacementDriver executorInclinedPlacementDriver;
-
- /** A supplier function that returns {@link IgniteSql}. */
- private final Supplier<IgniteSql> sql;
-
- private final SchemaVersions schemaVersions;
-
private final PartitionReplicatorNodeRecovery
partitionReplicatorNodeRecovery;
/** Ends at the {@link IgniteComponent#stopAsync(ComponentContext)} with
an {@link NodeStoppingException}. */
private final CompletableFuture<Void> stopManagerFuture = new
CompletableFuture<>();
- /** Marshallers provider. */
- private final ReflectionMarshallersProvider marshallers = new
ReflectionMarshallersProvider();
-
- private final TransactionInflights transactionInflights;
-
private final PartitionReplicaLifecycleManager
partitionReplicaLifecycleManager;
- @Nullable
- private ScheduledExecutorService streamerFlushExecutor;
-
- @Nullable
- private StreamerReceiverRunner streamerReceiverRunner;
-
private final CompletableFuture<Void> readyToProcessReplicaStarts = new
CompletableFuture<>();
/** Configuration of rebalance retries delay. */
@@ -269,6 +228,10 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final MetricManager metricManager;
+ private final StreamerFlushExecutorFactory streamerFlushExecutorFactory;
+
+ private final TableImplFactory tableImplFactory;
+
private final TablePartitionResourcesFactory partitionResourcesFactory;
private final TableZoneCoordinator zoneCoordinator;
@@ -299,6 +262,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
* @param minTimeCollectorService Collects minimum required timestamp for
each partition.
* @param systemDistributedConfiguration System distributed configuration.
* @param metricManager Metric manager.
+ * @param mvTableStorageFactory Factory for creating MV table storages.
*/
public TableManager(
InternalClusterNode localNode,
@@ -332,12 +296,9 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
MinimumRequiredTimeCollectorService minTimeCollectorService,
SystemDistributedConfiguration systemDistributedConfiguration,
MetricManager metricManager,
- PartitionModificationCounterFactory
partitionModificationCounterFactory
+ PartitionModificationCounterFactory
partitionModificationCounterFactory,
+ MvTableStorageFactory mvTableStorageFactory
) {
- this.localNode = localNode;
- this.topologyService = topologyService;
- this.lockMgr = lockMgr;
- this.replicaSvc = replicaSvc;
this.txManager = txManager;
this.dataStorageMgr = dataStorageMgr;
this.metaStorageMgr = metaStorageMgr;
@@ -346,17 +307,14 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
this.clockService = clockService;
this.catalogService = catalogService;
this.failureProcessor = failureProcessor;
- this.observableTimestampTracker = observableTimestampTracker;
- this.sql = sql;
this.lowWatermark = lowWatermark;
- this.transactionInflights = transactionInflights;
this.partitionReplicaLifecycleManager =
partitionReplicaLifecycleManager;
this.metricManager = metricManager;
this.executorInclinedSchemaSyncService = new
ExecutorInclinedSchemaSyncService(schemaSyncService,
partitionOperationsExecutor);
- this.executorInclinedPlacementDriver = new
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
+ PlacementDriver executorInclinedPlacementDriver = new
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
- schemaVersions = new
SchemaVersionsImpl(executorInclinedSchemaSyncService, catalogService,
clockService);
+ SchemaVersions schemaVersions = new
SchemaVersionsImpl(executorInclinedSchemaSyncService, catalogService,
clockService);
tablesVv = new IncrementalVersionedValue<>("TableManager#tables",
registry, 100, null);
@@ -389,7 +347,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
lowWatermark,
validationSchemasSource,
this.executorInclinedSchemaSyncService,
- this.executorInclinedPlacementDriver,
+ executorInclinedPlacementDriver,
topologyService,
localNode,
remotelyTriggeredResourceRegistry,
@@ -403,6 +361,27 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
fullStateTransferIndexChooser
);
+ streamerFlushExecutorFactory = new
StreamerFlushExecutorFactory(localNode);
+
+ tableImplFactory = new TableImplFactory(
+ topologyService,
+ lockMgr,
+ replicaSvc,
+ txManager,
+ clockService,
+ observableTimestampTracker,
+ executorInclinedPlacementDriver,
+ transactionInflights,
+ schemaVersions,
+ sql,
+ failureProcessor,
+ mvTableStorageFactory,
+ catalogService,
+ dataStorageMgr,
+ metricManager,
+ streamerFlushExecutorFactory
+ );
+
rebalanceRetryDelayConfiguration = new
SystemDistributedConfigurationPropertyHolder<>(
systemDistributedConfiguration,
(v, r) -> {},
@@ -625,6 +604,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
stopManagerFuture.completeExceptionally(new NodeStoppingException());
+ streamerFlushExecutorFactory.beforeStop();
busyLock.block();
lowWatermark.removeListener(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
onLowWatermarkChangedListener);
@@ -655,15 +635,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
zoneCoordinator::stop,
() ->
closeAllManually(tableRegistry.tables().values().stream().map(table -> () ->
closeTable(table))),
() -> shutdownAndAwaitTermination(scanRequestExecutor,
shutdownTimeoutSeconds, TimeUnit.SECONDS),
- () -> {
- ScheduledExecutorService streamerFlushExecutor;
-
- synchronized (this) {
- streamerFlushExecutor = this.streamerFlushExecutor;
- }
-
- shutdownAndAwaitTermination(streamerFlushExecutor,
shutdownTimeoutSeconds, TimeUnit.SECONDS);
- }
+ () ->
streamerFlushExecutorFactory.stop(shutdownTimeoutSeconds)
);
} catch (Exception e) {
return failedFuture(e);
@@ -702,63 +674,14 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
LOG.trace("Creating local table: name={}, id={}, token={}",
tableName.toCanonicalForm(), tableDescriptor.id(), causalityToken);
- MvTableStorage tableStorage = createTableStorage(tableDescriptor,
zoneDescriptor);
-
- int partitions = zoneDescriptor.partitions();
-
- InternalTableImpl internalTable = new InternalTableImpl(
+ return tableImplFactory.createTableImpl(
tableName,
- zoneDescriptor.id(),
- tableDescriptor.id(),
- partitions,
- topologyService,
- txManager,
- tableStorage,
- replicaSvc,
- clockService,
- observableTimestampTracker,
- executorInclinedPlacementDriver,
- transactionInflights,
- this::streamerFlushExecutor,
- Objects.requireNonNull(streamerReceiverRunner),
- createAndRegisterMetricsSource(tableStorage, tableName)
- );
-
- CatalogTableProperties descProps = tableDescriptor.properties();
-
- return new TableImpl(
- internalTable,
- lockMgr,
- schemaVersions,
- marshallers,
- sql.get(),
- failureProcessor,
- tableDescriptor.primaryKeyIndexId(),
- new
TableStatsStalenessConfiguration(descProps.staleRowsFraction(),
descProps.minStaleRowsCount()),
+ tableDescriptor,
+ zoneDescriptor,
schemaRegistry
);
}
- /**
- * Creates data storage for the provided table.
- *
- * @param tableDescriptor Catalog table descriptor.
- * @param zoneDescriptor Catalog distributed zone descriptor.
- */
- protected MvTableStorage createTableStorage(CatalogTableDescriptor
tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
- StorageEngine engine =
dataStorageMgr.engineByStorageProfile(tableDescriptor.storageProfile());
-
- if (engine == null) {
- // Create a placeholder to allow Table object being created.
- engine = new NullStorageEngine();
- }
-
- return engine.createMvTable(
- new StorageTableDescriptor(tableDescriptor.id(),
zoneDescriptor.partitions(), tableDescriptor.storageProfile()),
- new CatalogStorageIndexDescriptorSupplier(catalogService,
lowWatermark)
- );
- }
-
/**
* Drops local structures for a table.
*
@@ -1111,11 +1034,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return anyOf(future, stopManagerFuture).thenApply(o -> (T) o);
}
- @TestOnly
- public ReplicaService replicaService() {
- return replicaSvc;
- }
-
/** Internal event. */
private static class DestroyTableEvent {
final int catalogVersion;
@@ -1154,26 +1072,9 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
}
- private synchronized ScheduledExecutorService streamerFlushExecutor() {
- if (!busyLock.enterBusy()) {
- throw new IgniteException(NODE_STOPPING_ERR, new
NodeStoppingException());
- }
-
- try {
- if (streamerFlushExecutor == null) {
- streamerFlushExecutor =
Executors.newSingleThreadScheduledExecutor(
- IgniteThreadFactory.create(localNode.name(),
"streamer-flush-executor", LOG, STORAGE_WRITE));
- }
-
- return streamerFlushExecutor;
- } finally {
- busyLock.leaveBusy();
- }
- }
-
@Override
public void setStreamerReceiverRunner(StreamerReceiverRunner runner) {
- this.streamerReceiverRunner = runner;
+ tableImplFactory.setStreamerReceiverRunner(runner);
}
/**
@@ -1187,46 +1088,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return zoneCoordinator.zoneTables(zoneId);
}
- private ReadWriteMetricSource
createAndRegisterMetricsSource(MvTableStorage tableStorage, QualifiedName
tableName) {
- StorageTableDescriptor tableDescriptor =
tableStorage.getTableDescriptor();
-
- CatalogTableDescriptor catalogTableDescriptor =
catalogService.latestCatalog().table(tableDescriptor.getId());
-
- // The table might be created during the recovery phase.
- // In that case, we should only register the metric source for the
actual tables that exist in the latest catalog.
- boolean registrationNeeded = catalogTableDescriptor != null;
-
- StorageEngine engine =
dataStorageMgr.engineByStorageProfile(tableDescriptor.getStorageProfile());
-
- // Engine can be null sometimes, see "TableManager.createTableStorage".
- if (engine != null && registrationNeeded) {
- StorageEngineTablesMetricSource engineMetricSource = new
StorageEngineTablesMetricSource(engine.name(), tableName);
-
- engine.addTableMetrics(tableDescriptor, engineMetricSource);
-
- try {
- metricManager.registerSource(engineMetricSource);
- metricManager.enable(engineMetricSource);
- } catch (Exception e) {
- String message = "Failed to register storage engine metrics
source for table [id={}, name={}].";
- LOG.warn(message, e, tableDescriptor.getId(), tableName);
- }
- }
-
- ReadWriteMetricSource source = new TableMetricSource(tableName);
-
- if (registrationNeeded) {
- try {
- metricManager.registerSource(source);
- metricManager.enable(source);
- } catch (Exception e) {
- LOG.warn("Failed to register metrics source for table [id={},
name={}].", e, tableDescriptor.getId(), tableName);
- }
- }
-
- return source;
- }
-
private void unregisterMetricsSource(TableViewInternal table) {
if (table == null) {
return;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 8e3dfaf5b74..373e2c35d2e 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -74,11 +74,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
-import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
@@ -400,6 +399,17 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
resetPeersCallCount.updateAndGet(existing -> Math.max(existing,
resetSequenceToken));
}
+ private static MvTableStorageFactory spyingStorageFactory(
+ MvTableStorageFactory delegate,
+ Consumer<MvTableStorage> decorator
+ ) {
+ return (tableDesc, zoneDesc) -> {
+ MvTableStorage storage =
spy(delegate.createMvTableStorage(tableDesc, zoneDesc));
+ decorator.accept(storage);
+ return storage;
+ };
+ }
+
/**
* Creates and starts TableManage and dependencies.
*/
@@ -593,16 +603,11 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
minTimeCollectorService,
systemDistributedConfiguration,
metricManager,
- TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
- ) {
-
- @Override
- protected MvTableStorage createTableStorage(CatalogTableDescriptor
tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
- mvTableStorage = spy(super.createTableStorage(tableDescriptor,
zoneDescriptor));
-
- return mvTableStorage;
- }
- };
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY,
+ spyingStorageFactory(new DefaultMvTableStorageFactory(dsm,
catalogManager, lowWatermark), storage -> {
+ mvTableStorage = storage;
+ })
+ );
tableManager.setStreamerReceiverRunner(mock(StreamerReceiverRunner.class));
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 5c98a208f65..dcaa8141136 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -64,8 +64,6 @@ import java.util.function.Consumer;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogTestUtils;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
-import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.causality.RevisionListenerRegistry;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
@@ -536,6 +534,17 @@ public class TableManagerTest extends IgniteAbstractTest {
return tbl2;
}
+ private static MvTableStorageFactory spyingStorageFactory(
+ MvTableStorageFactory delegate,
+ Consumer<MvTableStorage> decorator
+ ) {
+ return (tableDesc, zoneDesc) -> {
+ MvTableStorage storage =
spy(delegate.createMvTableStorage(tableDesc, zoneDesc));
+ decorator.accept(storage);
+ return storage;
+ };
+ }
+
private TableManager createTableManager(CompletableFuture<TableManager>
tblManagerFut) {
return createTableManager(tblManagerFut, unused -> {});
}
@@ -599,18 +608,13 @@ public class TableManagerTest extends IgniteAbstractTest {
new MinimumRequiredTimeCollectorServiceImpl(),
systemDistributedConfiguration,
new NoOpMetricManager(),
- TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
+ TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY,
+ spyingStorageFactory(new DefaultMvTableStorageFactory(dsm,
catalogManager, lowWatermark), storage -> {
+ mvTableStorage = storage;
+ tableStorageDecorator.accept(storage);
+ })
) {
- @Override
- protected MvTableStorage createTableStorage(CatalogTableDescriptor
tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
- mvTableStorage = spy(super.createTableStorage(tableDescriptor,
zoneDescriptor));
-
- tableStorageDecorator.accept(mvTableStorage);
-
- return mvTableStorage;
- }
-
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
return sharedTxStateStorage.startAsync(componentContext)
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxWriteIntentResolutionTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxWriteIntentResolutionTest.java
index a910050222b..fa55d8a2353 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxWriteIntentResolutionTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxWriteIntentResolutionTest.java
@@ -316,7 +316,7 @@ public class ItTxWriteIntentResolutionTest extends
ClusterPerClassIntegrationTes
UUID roTxId = txId(roTx);
ReadOnlySingleRowPkReplicaRequest req = roGetRequest(coordinator,
wiGroupId, tableId, keyTuple, roTxId, readTs);
- CompletableFuture<?> reqFut =
wiBackupNode.distributedTableManager().replicaService().invoke(wiBackupNode.name(),
req);
+ CompletableFuture<?> reqFut =
wiBackupNode.replicaService().invoke(wiBackupNode.name(), req);
if (expected.get1() != null) {
assertThat(reqFut, willThrow(expected.get1()));