This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 55f7a8cf47 IGNITE-20067 Optimize "StorageUpdateHandler#handleUpdateAll" (#3087) 55f7a8cf47 is described below commit 55f7a8cf477682921fe943e6dfa1ebc98c3168c7 Author: Phillippko <phillip...@gmail.com> AuthorDate: Tue Jan 30 11:44:31 2024 +0300 IGNITE-20067 Optimize "StorageUpdateHandler#handleUpdateAll" (#3087) --- .../runner/app/ItIgniteNodeRestartTest.java | 5 + .../org/apache/ignite/internal/app/IgniteImpl.java | 4 + .../SchemaDistributedConfigurationModule.java | 2 +- .../StorageUpdateConfigurationSchema.java | 37 ++++ .../internal/storage/BaseMvStoragesTest.java | 3 + .../storage/impl/TestMvPartitionStorage.java | 8 +- .../PersistentPageMemoryMvTableStorageTest.java | 3 +- .../VolatilePageMemoryMvTableStorageTest.java | 3 - .../PersistentPageMemoryStorageEngineTest.java | 2 - ...ageMemoryMvPartitionStorageConcurrencyTest.java | 2 - ...rsistentPageMemoryMvPartitionStorageGcTest.java | 2 - ...PersistentPageMemoryMvPartitionStorageTest.java | 2 - ...ageMemoryMvPartitionStorageConcurrencyTest.java | 3 - ...VolatilePageMemoryMvPartitionStorageGcTest.java | 3 - .../VolatilePageMemoryMvPartitionStorageTest.java | 3 - .../RocksDbMvPartitionStorageConcurrencyTest.java | 2 - .../rocksdb/RocksDbMvPartitionStorageGcTest.java | 2 - .../rocksdb/RocksDbMvPartitionStorageTest.java | 2 - .../storage/rocksdb/RocksDbMvTableStorageTest.java | 2 - .../rocksdb/engine/RocksDbStorageEngineTest.java | 2 - .../ItAbstractInternalTableScanTest.java | 7 +- .../ItInternalTableReadOnlyOperationsTest.java | 6 +- .../apache/ignite/distributed/ItLockTableTest.java | 5 + .../ItTxDistributedCleanupRecoveryTest.java | 1 + .../distributed/ItTxDistributedTestSingleNode.java | 5 + ...xDistributedTestSingleNodeNoCleanupMessage.java | 1 + .../ignite/distributed/ItTxStateLocalMapTest.java | 9 +- .../rebalance/ItRebalanceDistributedTest.java | 4 + .../table/distributed/StorageUpdateHandler.java | 191 ++++++++++++++------ .../internal/table/distributed/TableManager.java | 16 +- .../internal/table/InteropOperationsTest.java | 7 +- .../internal/table/TableKvOperationsTestBase.java | 6 +- .../internal/table/distributed/IndexBaseTest.java | 8 +- .../table/distributed/StorageCleanupTest.java | 9 +- ...BaseTest.java => StorageUpdateHandlerTest.java} | 193 ++++++++------------- .../table/distributed/TableManagerTest.java | 6 + .../PersistentPageMemoryGcUpdateHandlerTest.java | 2 - .../distributed/gc/RocksDbGcUpdateHandlerTest.java | 2 - .../gc/VolatilePageMemoryGcUpdateHandlerTest.java | 3 - .../raft/PartitionCommandListenerTest.java | 13 +- .../PartitionReplicaListenerIndexLockingTest.java | 11 +- .../replication/PartitionReplicaListenerTest.java | 10 +- .../apache/ignite/distributed/ItTxTestCluster.java | 8 +- .../table/impl/DummyInternalTableImpl.java | 34 +++- 44 files changed, 403 insertions(+), 246 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 833de95484..8099a903bf 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -140,6 +140,7 @@ import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.SchemaManager; import org.apache.ignite.internal.schema.configuration.GcConfiguration; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.sql.api.IgniteSqlImpl; import org.apache.ignite.internal.sql.configuration.distributed.SqlDistributedConfiguration; import org.apache.ignite.internal.sql.configuration.local.SqlLocalConfiguration; @@ -226,6 +227,9 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { @InjectConfiguration private static TransactionConfiguration txConfiguration; + @InjectConfiguration + private static StorageUpdateConfiguration storageUpdateConfiguration; + @InjectConfiguration private CriticalWorkersConfiguration workersConfiguration; @@ -493,6 +497,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { name, registry, gcConfig, + storageUpdateConfiguration, clusterSvc, raftMgr, replicaMgr, diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 9e74df056a..d3c07d0857 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -134,6 +134,7 @@ import org.apache.ignite.internal.rest.metrics.MetricRestFactory; import org.apache.ignite.internal.rest.node.NodeManagementRestFactory; import org.apache.ignite.internal.schema.SchemaManager; import org.apache.ignite.internal.schema.configuration.GcConfiguration; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.security.authentication.AuthenticationManager; import org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl; import org.apache.ignite.internal.security.configuration.SecurityConfiguration; @@ -620,10 +621,13 @@ public class IgniteImpl implements Ignite { indexNodeFinishedRwTransactionsChecker ); + StorageUpdateConfiguration storageUpdateConfiguration = clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY); + distributedTblMgr = new TableManager( name, registry, gcConfig, + storageUpdateConfiguration, clusterSvc, raftMgr, replicaMgr, diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDistributedConfigurationModule.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDistributedConfigurationModule.java index b483d7d061..90357c7991 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDistributedConfigurationModule.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDistributedConfigurationModule.java @@ -38,7 +38,7 @@ public class SchemaDistributedConfigurationModule implements ConfigurationModule @Override public Collection<RootKey<?, ?>> rootKeys() { - return List.of(GcConfiguration.KEY); + return List.of(GcConfiguration.KEY, StorageUpdateConfiguration.KEY); } @Override diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/StorageUpdateConfigurationSchema.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/StorageUpdateConfigurationSchema.java new file mode 100644 index 0000000000..4238f351df --- /dev/null +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/StorageUpdateConfigurationSchema.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.schema.configuration; + +import org.apache.ignite.configuration.annotation.ConfigurationRoot; +import org.apache.ignite.configuration.annotation.ConfigurationType; +import org.apache.ignite.configuration.annotation.Value; +import org.apache.ignite.configuration.validation.Range; + +/** + * Configuration schema for StorageUpdateHandler. + */ +@ConfigurationRoot(rootName = "storageUpdate", type = ConfigurationType.DISTRIBUTED) +public class StorageUpdateConfigurationSchema { + + /** + * Maximum allowed length (in bytes) of a batch to write into physical storage. + **/ + @Range(min = 1) + @Value(hasDefault = true) + public int batchByteLength = 8192; +} diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java index 9c6f3ff49d..c3666e5cc8 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.lang.IgniteBiTuple; @@ -48,10 +49,12 @@ import org.apache.ignite.internal.tostring.S; import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.internal.util.Cursor; import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.extension.ExtendWith; /** * Base test for MV storages, contains pojo classes, their descriptor and a marshaller instance. */ +@ExtendWith(ConfigurationExtension.class) public abstract class BaseMvStoragesTest extends BaseIgniteAbstractTest { /** Default reflection marshaller factory. */ private static final MarshallerFactory MARSHALLER_FACTORY = new ReflectionMarshallerFactory(); diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java index 589b0242fe..da2315b95c 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java @@ -72,13 +72,19 @@ public class TestMvPartitionStorage implements MvPartitionStorage { private volatile boolean rebalance; - private final LockByRowId lockByRowId = new LockByRowId(); + private final LockByRowId lockByRowId; /** Amount of cursors that opened and still do not close. */ private final AtomicInteger pendingCursors = new AtomicInteger(); public TestMvPartitionStorage(int partitionId) { this.partitionId = partitionId; + this.lockByRowId = new LockByRowId(); + } + + public TestMvPartitionStorage(int partitionId, LockByRowId lockByRowId) { + this.partitionId = partitionId; + this.lockByRowId = lockByRowId; } private static class VersionChain implements GcEntry { diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java index 064b52358e..e949fd9c86 100644 --- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java +++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java @@ -23,7 +23,6 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur import static org.hamcrest.MatcherAssert.assertThat; import java.nio.file.Path; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.pagememory.io.PageIoRegistry; import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState; @@ -43,7 +42,7 @@ import org.junit.jupiter.api.extension.ExtendWith; /** * Tests for {@link PersistentPageMemoryTableStorage} class. */ -@ExtendWith({ConfigurationExtension.class, WorkDirectoryExtension.class}) +@ExtendWith(WorkDirectoryExtension.class) public class PersistentPageMemoryMvTableStorageTest extends AbstractMvTableStorageTest { private PersistentPageMemoryStorageEngine engine; diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java index 2a6747c09b..370fccc77a 100644 --- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java +++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java @@ -30,7 +30,6 @@ import static org.mockito.Mockito.verify; import java.nio.ByteBuffer; import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.lang.IgniteInternalCheckedException; import org.apache.ignite.internal.pagememory.evict.PageEvictionTracker; @@ -55,12 +54,10 @@ import org.apache.ignite.internal.util.IgniteUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; /** * Tests for {@link VolatilePageMemoryTableStorage}. */ -@ExtendWith(ConfigurationExtension.class) public class VolatilePageMemoryMvTableStorageTest extends AbstractMvTableStorageTest { private final PageEvictionTracker pageEvictionTracker = spy(PageEvictionTrackerNoOp.INSTANCE); diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java index 6a41918159..89984c2ec5 100644 --- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java +++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.storage.pagememory.engine; import java.nio.file.Path; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.pagememory.io.PageIoRegistry; import org.apache.ignite.internal.storage.engine.AbstractStorageEngineTest; @@ -32,7 +31,6 @@ import org.junit.jupiter.api.extension.ExtendWith; /** * Implementation of the {@link AbstractStorageEngineTest} for the {@link PersistentPageMemoryStorageEngine#ENGINE_NAME} engine. */ -@ExtendWith(ConfigurationExtension.class) @ExtendWith(WorkDirectoryExtension.class) public class PersistentPageMemoryStorageEngineTest extends AbstractStorageEngineTest { @InjectConfiguration("mock {checkpoint.checkpointDelayMillis = 0, defaultRegion.size = 1048576}") diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java index dcb72a8248..5cdef45b5d 100644 --- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java +++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.mock; import java.nio.file.Path; import org.apache.ignite.internal.catalog.CatalogService; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.pagememory.io.PageIoRegistry; import org.apache.ignite.internal.storage.AbstractMvPartitionStorageConcurrencyTest; @@ -40,7 +39,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(WorkDirectoryExtension.class) -@ExtendWith(ConfigurationExtension.class) class PersistentPageMemoryMvPartitionStorageConcurrencyTest extends AbstractMvPartitionStorageConcurrencyTest { private PersistentPageMemoryStorageEngine engine; diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageGcTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageGcTest.java index cd8514cfc5..7096a8285e 100644 --- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageGcTest.java +++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageGcTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.mock; import java.nio.file.Path; import org.apache.ignite.internal.catalog.CatalogService; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.pagememory.io.PageIoRegistry; import org.apache.ignite.internal.storage.AbstractMvPartitionStorageGcTest; @@ -40,7 +39,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(WorkDirectoryExtension.class) -@ExtendWith(ConfigurationExtension.class) class PersistentPageMemoryMvPartitionStorageGcTest extends AbstractMvPartitionStorageGcTest { private PersistentPageMemoryStorageEngine engine; diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java index ae68cf7dbe..4e50114276 100644 --- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java +++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java @@ -30,7 +30,6 @@ import static org.mockito.Mockito.mock; import java.nio.file.Path; import org.apache.ignite.internal.catalog.CatalogService; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.pagememory.io.PageIoRegistry; @@ -49,7 +48,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(WorkDirectoryExtension.class) -@ExtendWith(ConfigurationExtension.class) class PersistentPageMemoryMvPartitionStorageTest extends AbstractPageMemoryMvPartitionStorageTest { @InjectConfiguration("mock.checkpoint.checkpointDelayMillis = 0") private PersistentPageMemoryStorageEngineConfiguration engineConfig; diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java index 45e4d731b8..0e168c08e2 100644 --- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java +++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java @@ -22,7 +22,6 @@ import static org.apache.ignite.internal.storage.pagememory.configuration.schema import static org.mockito.Mockito.mock; import org.apache.ignite.internal.catalog.CatalogService; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp; import org.apache.ignite.internal.pagememory.io.PageIoRegistry; @@ -35,9 +34,7 @@ import org.apache.ignite.internal.storage.pagememory.configuration.schema.Volati import org.apache.ignite.internal.util.IgniteUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.extension.ExtendWith; -@ExtendWith(ConfigurationExtension.class) class VolatilePageMemoryMvPartitionStorageConcurrencyTest extends AbstractMvPartitionStorageConcurrencyTest { private VolatilePageMemoryStorageEngine engine; diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageGcTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageGcTest.java index e73a984ca5..058f530dce 100644 --- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageGcTest.java +++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageGcTest.java @@ -22,7 +22,6 @@ import static org.apache.ignite.internal.storage.pagememory.configuration.schema import static org.mockito.Mockito.mock; import org.apache.ignite.internal.catalog.CatalogService; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp; import org.apache.ignite.internal.pagememory.io.PageIoRegistry; @@ -35,9 +34,7 @@ import org.apache.ignite.internal.storage.pagememory.configuration.schema.Volati import org.apache.ignite.internal.util.IgniteUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.extension.ExtendWith; -@ExtendWith(ConfigurationExtension.class) class VolatilePageMemoryMvPartitionStorageGcTest extends AbstractMvPartitionStorageGcTest { private VolatilePageMemoryStorageEngine engine; diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java index e0d29bbcc3..af35e1843c 100644 --- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java +++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java @@ -22,7 +22,6 @@ import static org.apache.ignite.internal.storage.pagememory.configuration.schema import static org.mockito.Mockito.mock; import org.apache.ignite.internal.catalog.CatalogService; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp; import org.apache.ignite.internal.pagememory.io.PageIoRegistry; @@ -34,9 +33,7 @@ import org.apache.ignite.internal.storage.pagememory.configuration.schema.Volati import org.apache.ignite.internal.util.IgniteUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.extension.ExtendWith; -@ExtendWith(ConfigurationExtension.class) class VolatilePageMemoryMvPartitionStorageTest extends AbstractPageMemoryMvPartitionStorageTest { @InjectConfiguration private VolatilePageMemoryStorageEngineConfiguration engineConfig; diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageConcurrencyTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageConcurrencyTest.java index 6d27bf0475..8161f3dea4 100644 --- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageConcurrencyTest.java +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageConcurrencyTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.mock; import java.nio.file.Path; import org.apache.ignite.internal.catalog.CatalogService; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.storage.AbstractMvPartitionStorageConcurrencyTest; import org.apache.ignite.internal.storage.engine.StorageTableDescriptor; @@ -40,7 +39,6 @@ import org.junit.jupiter.api.extension.ExtendWith; * Storage test implementation for {@link RocksDbMvPartitionStorage}. */ @ExtendWith(WorkDirectoryExtension.class) -@ExtendWith(ConfigurationExtension.class) public class RocksDbMvPartitionStorageConcurrencyTest extends AbstractMvPartitionStorageConcurrencyTest { private RocksDbStorageEngine engine; diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java index b0770081dd..05c3fa60f8 100644 --- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.mock; import java.nio.file.Path; import org.apache.ignite.internal.catalog.CatalogService; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.storage.AbstractMvPartitionStorageGcTest; import org.apache.ignite.internal.storage.engine.StorageTableDescriptor; @@ -40,7 +39,6 @@ import org.junit.jupiter.api.extension.ExtendWith; * Test implementation for {@link RocksDbStorageEngine}. */ @ExtendWith(WorkDirectoryExtension.class) -@ExtendWith(ConfigurationExtension.class) public class RocksDbMvPartitionStorageGcTest extends AbstractMvPartitionStorageGcTest { private RocksDbStorageEngine engine; diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java index 54812a98ac..7df9af0e5e 100644 --- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.mock; import java.nio.file.Path; import org.apache.ignite.internal.catalog.CatalogService; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest; import org.apache.ignite.internal.storage.engine.StorageTableDescriptor; @@ -40,7 +39,6 @@ import org.junit.jupiter.api.extension.ExtendWith; * Storage test implementation for {@link RocksDbMvPartitionStorage}. */ @ExtendWith(WorkDirectoryExtension.class) -@ExtendWith(ConfigurationExtension.class) public class RocksDbMvPartitionStorageTest extends AbstractMvPartitionStorageTest { private RocksDbStorageEngine engine; diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java index fad188722a..f971749324 100644 --- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java @@ -28,7 +28,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import java.nio.file.Path; import java.util.UUID; import java.util.concurrent.TimeUnit; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.storage.AbstractMvTableStorageTest; @@ -50,7 +49,6 @@ import org.junit.jupiter.api.extension.ExtendWith; * Tests for the {@link RocksDbTableStorage}. */ @ExtendWith(WorkDirectoryExtension.class) -@ExtendWith(ConfigurationExtension.class) public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest { private RocksDbStorageEngine engine; diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java index 4faa412a08..f908c5d121 100644 --- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java +++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.storage.rocksdb.engine; import java.nio.file.Path; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.storage.engine.AbstractStorageEngineTest; import org.apache.ignite.internal.storage.engine.StorageEngine; @@ -31,7 +30,6 @@ import org.junit.jupiter.api.extension.ExtendWith; /** * Implementation of the {@link AbstractStorageEngineTest} for the {@link RocksDbStorageEngine#ENGINE_NAME} engine. */ -@ExtendWith(ConfigurationExtension.class) @ExtendWith(WorkDirectoryExtension.class) public class RocksDbStorageEngineTest extends AbstractStorageEngineTest { @InjectConfiguration("mock.flushDelayMillis = 0") diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java index 0be3ee9e91..ec1d9200bb 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.schema.row.RowAssembler; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.PartitionTimestampCursor; @@ -83,6 +84,9 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest @InjectConfiguration private TransactionConfiguration txConfiguration; + @InjectConfiguration + private StorageUpdateConfiguration storageUpdateConfiguration; + /** Mock partition storage. */ @Mock private MvPartitionStorage mockStorage; @@ -97,7 +101,8 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest public void setUp(TestInfo testInfo) { when(mockStorage.scan(any(HybridTimestamp.class))).thenReturn(mock(PartitionTimestampCursor.class)); - internalTbl = new DummyInternalTableImpl(mock(ReplicaService.class), mockStorage, ROW_SCHEMA, txConfiguration); + internalTbl = new DummyInternalTableImpl( + mock(ReplicaService.class), mockStorage, ROW_SCHEMA, txConfiguration, storageUpdateConfiguration); } /** diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java index 4373873f25..a9ff0fb883 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.schema.BinaryRowEx; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.ColumnsExtractor; import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.schema.row.Row; import org.apache.ignite.internal.schema.row.RowAssembler; import org.apache.ignite.internal.storage.MvPartitionStorage; @@ -88,6 +89,9 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest { @InjectConfiguration private TransactionConfiguration txConfiguration; + @InjectConfiguration + private StorageUpdateConfiguration storageUpdateConfiguration; + private static final HybridClock CLOCK = new HybridClockImpl(); private static final Row ROW_1 = createKeyValueRow(1, 1001); @@ -120,7 +124,7 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest { public void setUp(TestInfo testInfo) { when(mockStorage.scan(any(HybridTimestamp.class))).thenReturn(mock(PartitionTimestampCursor.class)); - internalTbl = new DummyInternalTableImpl(replicaService, mockStorage, SCHEMA, txConfiguration); + internalTbl = new DummyInternalTableImpl(replicaService, mockStorage, SCHEMA, txConfiguration, storageUpdateConfiguration); lenient().when(readOnlyTx.isReadOnly()).thenReturn(true); lenient().when(readOnlyTx.readTimestamp()).thenReturn(CLOCK.now()); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java index c43f5b151a..d15553e979 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.configuration.GcConfiguration; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.table.TableViewInternal; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.tx.DeadlockPreventionPolicy; @@ -94,6 +95,9 @@ public class ItLockTableTest extends IgniteAbstractTest { @InjectConfiguration protected static TransactionConfiguration txConfiguration; + @InjectConfiguration + protected static StorageUpdateConfiguration storageUpdateConfiguration; + private ItTxTestCluster txTestCluster; private HybridTimestampTracker timestampTracker = new HybridTimestampTracker(); @@ -113,6 +117,7 @@ public class ItLockTableTest extends IgniteAbstractTest { testInfo, raftConfiguration, txConfiguration, + storageUpdateConfiguration, workDir, 1, 1, diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java index 469581cd52..a6eacd38e2 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java @@ -53,6 +53,7 @@ public class ItTxDistributedCleanupRecoveryTest extends ItTxDistributedTestSingl testInfo, raftConfiguration, txConfiguration, + storageUpdateConfiguration, workDir, nodes(), replicas(), diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java index 2b4f99ce35..726dae43e2 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.TableViewInternal; @@ -80,6 +81,9 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest { @InjectConfiguration protected TransactionConfiguration txConfiguration; + @InjectConfiguration + protected StorageUpdateConfiguration storageUpdateConfiguration; + /** * Returns a count of nodes. * @@ -129,6 +133,7 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest { testInfo, raftConfiguration, txConfiguration, + storageUpdateConfiguration, workDir, nodes(), replicas(), diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java index e04f7605a1..f1b40d9185 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java @@ -96,6 +96,7 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistribut testInfo, raftConfiguration, txConfiguration, + storageUpdateConfiguration, workDir, nodes(), replicas(), diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java index 93a558f913..28825e1eaa 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.table.TableViewInternal; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.tx.HybridTimestampTracker; @@ -63,10 +64,13 @@ public class ItTxStateLocalMapTest extends IgniteAbstractTest { //TODO fsync can be turned on again after https://issues.apache.org/jira/browse/IGNITE-20195 @InjectConfiguration("mock: { fsync: false }") - private static RaftConfiguration raftConfig; + private RaftConfiguration raftConfig; @InjectConfiguration - private static TransactionConfiguration txConfiguration; + private TransactionConfiguration txConfiguration; + + @InjectConfiguration + private StorageUpdateConfiguration storageUpdateConfiguration; private final TestInfo testInfo; @@ -95,6 +99,7 @@ public class ItTxStateLocalMapTest extends IgniteAbstractTest { testInfo, raftConfig, txConfiguration, + storageUpdateConfiguration, workDir, NODES, NODES, diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index b5f413790e..e3e39452e3 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -146,6 +146,7 @@ import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.rest.configuration.RestConfiguration; import org.apache.ignite.internal.schema.SchemaManager; import org.apache.ignite.internal.schema.configuration.GcConfiguration; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.storage.DataStorageModules; import org.apache.ignite.internal.storage.StorageException; @@ -1040,10 +1041,13 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { catalogManager ); + StorageUpdateConfiguration storageUpdateConfiguration = clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY); + tableManager = new TableManager( name, registry, gcConfig, + storageUpdateConfiguration, clusterService, raftManager, replicaManager, diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java index ca678a9ca1..08cae993ac 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java @@ -20,15 +20,17 @@ package org.apache.ignite.internal.table.distributed; import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; import java.util.UUID; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; +import org.apache.ignite.internal.storage.MvPartitionStorage.Locker; import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; @@ -54,21 +56,27 @@ public class StorageUpdateHandler { /** A container for rows that were inserted, updated or removed. */ private final PendingRows pendingRows = new PendingRows(); + /** Storage updater configuration. */ + private final StorageUpdateConfiguration storageUpdateConfiguration; + /** * The constructor. * * @param partitionId Partition id. * @param storage Partition data storage. * @param indexUpdateHandler Partition index update handler. + * @param storageUpdateConfiguration Configuration for the storage update handler. */ public StorageUpdateHandler( int partitionId, PartitionDataStorage storage, - IndexUpdateHandler indexUpdateHandler + IndexUpdateHandler indexUpdateHandler, + StorageUpdateConfiguration storageUpdateConfiguration ) { this.partitionId = partitionId; this.storage = storage; this.indexUpdateHandler = indexUpdateHandler; + this.storageUpdateConfiguration = storageUpdateConfiguration; } /** @@ -103,27 +111,21 @@ public class StorageUpdateHandler { indexUpdateHandler.waitIndexes(); storage.runConsistently(locker -> { - RowId rowId = new RowId(partitionId, rowUuid); int commitTblId = commitPartitionId.tableId(); int commitPartId = commitPartitionId.partitionId(); + RowId rowId = new RowId(partitionId, rowUuid); - locker.lock(rowId); - - performStorageCleanupIfNeeded(txId, rowId, lastCommitTs); - - if (commitTs != null) { - storage.addWriteCommitted(rowId, row, commitTs); - } else { - BinaryRow oldRow = storage.addWrite(rowId, row, txId, commitTblId, commitPartId); - - if (oldRow != null) { - assert commitTs == null : String.format("Expecting explicit txn: [txId=%s]", txId); - // Previous uncommitted row should be removed from indexes. - tryRemovePreviousWritesIndex(rowId, oldRow); - } - } - - indexUpdateHandler.addToIndexes(row, rowId); + tryProcessRow( + locker, + commitTblId, + commitPartId, + rowId, + txId, + row, + lastCommitTs, + commitTs, + false + ); if (trackWriteIntent) { pendingRows.addPendingRowId(txId, rowId); @@ -137,6 +139,44 @@ public class StorageUpdateHandler { }); } + private boolean tryProcessRow( + Locker locker, + int commitTblId, + int commitPartId, + RowId rowId, + UUID txId, + @Nullable BinaryRow row, + @Nullable HybridTimestamp lastCommitTs, + @Nullable HybridTimestamp commitTs, + boolean useTryLock + ) { + if (useTryLock) { + if (!locker.tryLock(rowId)) { + return false; + } + } else { + locker.lock(rowId); + } + + performStorageCleanupIfNeeded(txId, rowId, lastCommitTs); + + if (commitTs != null) { + storage.addWriteCommitted(rowId, row, commitTs); + } else { + BinaryRow oldRow = storage.addWrite(rowId, row, txId, commitTblId, commitPartId); + + if (oldRow != null) { + assert commitTs == null : String.format("Expecting explicit txn: [txId=%s]", txId); + // Previous uncommitted row should be removed from indexes. + tryRemovePreviousWritesIndex(rowId, oldRow); + } + } + + indexUpdateHandler.addToIndexes(row, rowId); + + return true; + } + /** * Handle multiple updates. * @@ -155,53 +195,90 @@ public class StorageUpdateHandler { @Nullable Runnable onApplication, @Nullable HybridTimestamp commitTs ) { - indexUpdateHandler.waitIndexes(); - - storage.runConsistently(locker -> { - int commitTblId = commitPartitionId.tableId(); - int commitPartId = commitPartitionId.partitionId(); - - if (!nullOrEmpty(rowsToUpdate)) { - List<RowId> rowIds = new ArrayList<>(); - - // Sort IDs to prevent deadlock. Natural UUID order matches RowId order within the same partition. - SortedMap<UUID, TimedBinaryRow> sortedRowsToUpdateMap = new TreeMap<>(rowsToUpdate); - - for (Map.Entry<UUID, TimedBinaryRow> entry : sortedRowsToUpdateMap.entrySet()) { - RowId rowId = new RowId(partitionId, entry.getKey()); - BinaryRow row = entry.getValue() == null ? null : entry.getValue().binaryRow(); - - locker.lock(rowId); - - performStorageCleanupIfNeeded(txId, rowId, entry.getValue() == null ? null : entry.getValue().commitTimestamp()); + if (nullOrEmpty(rowsToUpdate)) { + return; + } - if (commitTs != null) { - storage.addWriteCommitted(rowId, row, commitTs); - } else { - BinaryRow oldRow = storage.addWrite(rowId, row, txId, commitTblId, commitPartId); + indexUpdateHandler.waitIndexes(); - if (oldRow != null) { - assert commitTs == null : String.format("Expecting explicit txn: [txId=%s]", txId); - // Previous uncommitted row should be removed from indexes. - tryRemovePreviousWritesIndex(rowId, oldRow); - } - } + int commitTblId = commitPartitionId.tableId(); + int commitPartId = commitPartitionId.partitionId(); + + Iterator<Entry<UUID, TimedBinaryRow>> it = rowsToUpdate.entrySet().iterator(); + Entry<UUID, TimedBinaryRow> lastUnprocessedEntry = it.next(); + + while (lastUnprocessedEntry != null) { + lastUnprocessedEntry = processEntriesUntilBatchLimit( + lastUnprocessedEntry, + txId, + trackWriteIntent, + commitTs, + commitTblId, + commitPartId, + it, + onApplication, + storageUpdateConfiguration.batchByteLength().value() + ); + } + } - rowIds.add(rowId); - indexUpdateHandler.addToIndexes(row, rowId); + private Entry<UUID, TimedBinaryRow> processEntriesUntilBatchLimit( + Entry<UUID, TimedBinaryRow> lastUnprocessedEntry, + UUID txId, + boolean trackWriteIntent, + @Nullable HybridTimestamp commitTs, + int commitTblId, + int commitPartId, + Iterator<Entry<UUID, TimedBinaryRow>> it, + @Nullable Runnable onApplication, int maxBatchLength + ) { + return storage.runConsistently(locker -> { + List<RowId> processedRowIds = new ArrayList<>(); + int batchLength = 0; + Entry<UUID, TimedBinaryRow> entryToProcess = lastUnprocessedEntry; + while (entryToProcess != null) { + RowId rowId = new RowId(partitionId, entryToProcess.getKey()); + BinaryRow row = entryToProcess.getValue() == null ? null : entryToProcess.getValue().binaryRow(); + + if (row != null) { + batchLength += row.tupleSliceLength(); } - if (trackWriteIntent) { - pendingRows.addPendingRowIds(txId, rowIds); + if (!processedRowIds.isEmpty() && batchLength > maxBatchLength) { + break; } - if (onApplication != null) { - onApplication.run(); + boolean rowProcessed = tryProcessRow( + locker, + commitTblId, + commitPartId, + rowId, + txId, + row, + entryToProcess.getValue() == null ? null : entryToProcess.getValue().commitTimestamp(), + commitTs, + !processedRowIds.isEmpty() + ); + + if (!rowProcessed) { + break; } + + entryToProcess = it.hasNext() ? it.next() : null; + processedRowIds.add(rowId); } - return null; + if (trackWriteIntent) { + pendingRows.addPendingRowIds(txId, processedRowIds); + } + + if (entryToProcess == null && onApplication != null) { + onApplication.run(); + } + + return entryToProcess; }); + } private void performStorageCleanupIfNeeded(UUID txId, RowId rowId, @Nullable HybridTimestamp lastCommitTs) { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index f33e3ecc92..d1190184b1 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -130,6 +130,7 @@ import org.apache.ignite.internal.replicator.ReplicationGroupStripes; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.SchemaManager; import org.apache.ignite.internal.schema.configuration.GcConfiguration; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.engine.MvTableStorage; @@ -353,12 +354,16 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { /** Ends at the {@link #stop()} with an {@link NodeStoppingException}. */ private final CompletableFuture<Void> stopManagerFuture = new CompletableFuture<>(); + /** Configuration for {@link StorageUpdateHandler}. */ + private final StorageUpdateConfiguration storageUpdateConfig; + /** * Creates a new table manager. * * @param nodeName Node name. * @param registry Registry for versioned values. * @param gcConfig Garbage collector configuration. + * @param storageUpdateConfig Storage update handler configuration. * @param raftMgr Raft manager. * @param replicaMgr Replica manager. * @param lockMgr Lock manager. @@ -377,6 +382,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { String nodeName, Consumer<LongFunction<CompletableFuture<?>>> registry, GcConfiguration gcConfig, + StorageUpdateConfiguration storageUpdateConfig, ClusterService clusterService, RaftManager raftMgr, ReplicaManager replicaMgr, @@ -420,6 +426,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { this.observableTimestampTracker = observableTimestampTracker; this.placementDriver = placementDriver; this.sql = sql; + this.storageUpdateConfig = storageUpdateConfig; TopologyService topologyService = clusterService.topologyService(); @@ -797,7 +804,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { partId, partitionDataStorage, table, - safeTimeTracker + safeTimeTracker, + storageUpdateConfig ); Peer serverPeer = newConfiguration.peer(localNode().name()); @@ -2234,7 +2242,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { int partitionId, PartitionDataStorage partitionDataStorage, TableImpl table, - PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker + PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker, + StorageUpdateConfiguration storageUpdateConfig ) { TableIndexStoragesSupplier indexes = table.indexStorageAdapters(partitionId); @@ -2245,7 +2254,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler( partitionId, partitionDataStorage, - indexUpdateHandler + indexUpdateHandler, + storageUpdateConfig ); return new PartitionUpdateHandlers(storageUpdateHandler, indexUpdateHandler, gcUpdateHandler); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java index 4b59bf557f..3143302b98 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaRegistry; import org.apache.ignite.internal.schema.SchemaTestUtils; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions; import org.apache.ignite.internal.table.distributed.schema.SchemaVersions; import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; @@ -100,6 +101,9 @@ public class InteropOperationsTest extends BaseIgniteAbstractTest { @InjectConfiguration private static TransactionConfiguration txConfiguration; + @InjectConfiguration + private static StorageUpdateConfiguration storageUpdateConfiguration; + @BeforeAll static void beforeAll() { NativeType[] types = { @@ -129,7 +133,8 @@ public class InteropOperationsTest extends BaseIgniteAbstractTest { ClusterService clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS); when(clusterService.topologyService().localMember().address()).thenReturn(DummyInternalTableImpl.ADDR); - intTable = new DummyInternalTableImpl(mock(ReplicaService.class, RETURNS_DEEP_STUBS), schema, txConfiguration); + intTable = new DummyInternalTableImpl( + mock(ReplicaService.class, RETURNS_DEEP_STUBS), schema, txConfiguration, storageUpdateConfiguration); SchemaRegistry schemaRegistry = new DummySchemaManagerImpl(schema); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableKvOperationsTestBase.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableKvOperationsTestBase.java index f784c3677a..bc5d45e1e7 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableKvOperationsTestBase.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableKvOperationsTestBase.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.configuration.testframework.ConfigurationExten import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions; import org.apache.ignite.internal.table.distributed.schema.SchemaVersions; import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; @@ -50,6 +51,9 @@ abstract class TableKvOperationsTestBase extends BaseIgniteAbstractTest { @InjectConfiguration private TransactionConfiguration txConfiguration; + @InjectConfiguration + private StorageUpdateConfiguration storageUpdateConfiguration; + protected static final int SCHEMA_VERSION = 1; protected final SchemaVersions schemaVersions = new ConstantSchemaVersions(SCHEMA_VERSION); @@ -73,6 +77,6 @@ abstract class TableKvOperationsTestBase extends BaseIgniteAbstractTest { } protected final DummyInternalTableImpl createInternalTable(SchemaDescriptor schema) { - return new DummyInternalTableImpl(replicaService, schema, txConfiguration); + return new DummyInternalTableImpl(replicaService, schema, txConfiguration, storageUpdateConfiguration); } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java index f008d0df9a..675aa1785a 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.distributed.TestPartitionDataStorage; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.BinaryRow; @@ -31,6 +32,7 @@ import org.apache.ignite.internal.schema.BinaryRowConverter; import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.BinaryTupleSchema; import org.apache.ignite.internal.schema.ColumnsExtractor; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.storage.BaseMvStoragesTest; import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; @@ -72,6 +74,9 @@ public abstract class IndexBaseTest extends BaseMvStoragesTest { SCHEMA_DESCRIPTOR.column("STRVAL").schemaIndex() }; + @InjectConfiguration + private StorageUpdateConfiguration storageUpdateConfiguration; + private static final BinaryTupleSchema USER_INDEX_SCHEMA = BinaryTupleSchema.createSchema(SCHEMA_DESCRIPTOR, USER_INDEX_COLS); private static final ColumnsExtractor USER_INDEX_BINARY_TUPLE_CONVERTER = new BinaryRowConverter(TUPLE_SCHEMA, USER_INDEX_SCHEMA); @@ -151,7 +156,8 @@ public abstract class IndexBaseTest extends BaseMvStoragesTest { storageUpdateHandler = new StorageUpdateHandler( PARTITION_ID, partitionDataStorage, - indexUpdateHandler + indexUpdateHandler, + storageUpdateConfiguration ); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java index a3c2e5ecf7..6d90ac37b2 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.distributed.TestPartitionDataStorage; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -43,6 +44,7 @@ import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowConverter; import org.apache.ignite.internal.schema.BinaryTupleSchema; import org.apache.ignite.internal.schema.ColumnsExtractor; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.storage.BaseMvStoragesTest; import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; @@ -89,9 +91,11 @@ public class StorageCleanupTest extends BaseMvStoragesTest { private TestHashIndexStorage hashInnerStorage; private TestMvPartitionStorage storage; private StorageUpdateHandler storageUpdateHandler; - private IndexUpdateHandler indexUpdateHandler; + @InjectConfiguration + private StorageUpdateConfiguration storageUpdateConfiguration; + @BeforeEach void setUp() { int tableId = 1; @@ -147,7 +151,8 @@ public class StorageCleanupTest extends BaseMvStoragesTest { storageUpdateHandler = new StorageUpdateHandler( PARTITION_ID, partitionDataStorage, - indexUpdateHandler + indexUpdateHandler, + storageUpdateConfiguration ); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java similarity index 50% copy from modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java copy to modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java index f008d0df9a..922e897d33 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java @@ -17,20 +17,28 @@ package org.apache.ignite.internal.table.distributed; -import static java.util.Collections.singletonMap; -import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.distributed.TestPartitionDataStorage; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowConverter; -import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.BinaryTupleSchema; import org.apache.ignite.internal.schema.ColumnsExtractor; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.storage.BaseMvStoragesTest; import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; @@ -41,25 +49,21 @@ import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor; import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor; import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage; import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage; -import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; +import org.apache.ignite.internal.storage.util.LockByRowId; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; -import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replicator.TimedBinaryRow; import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; import org.apache.ignite.internal.type.NativeTypes; -import org.apache.ignite.internal.util.Cursor; -import org.apache.ignite.internal.util.PendingComparableValuesTracker; -import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; /** - * Base test for indexes. Sets up a table with (int, string) key and (int, string) value and three indexes: primary key, hash index over - * value columns and sorted index over value columns. + * Test for {@link StorageUpdateHandler}. */ -public abstract class IndexBaseTest extends BaseMvStoragesTest { - protected static final int PARTITION_ID = 0; +public class StorageUpdateHandlerTest extends BaseMvStoragesTest { + private static final HybridClock CLOCK = new HybridClockImpl(); - private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory(); + protected static final int PARTITION_ID = 0; private static final BinaryTupleSchema TUPLE_SCHEMA = BinaryTupleSchema.createRowSchema(SCHEMA_DESCRIPTOR); @@ -76,19 +80,16 @@ public abstract class IndexBaseTest extends BaseMvStoragesTest { private static final ColumnsExtractor USER_INDEX_BINARY_TUPLE_CONVERTER = new BinaryRowConverter(TUPLE_SCHEMA, USER_INDEX_SCHEMA); - private static final UUID TX_ID = UUID.randomUUID(); - - TestHashIndexStorage pkInnerStorage; - TestSortedIndexStorage sortedInnerStorage; - TestHashIndexStorage hashInnerStorage; - TestMvPartitionStorage storage; - StorageUpdateHandler storageUpdateHandler; + private TestHashIndexStorage pkInnerStorage; + private TestSortedIndexStorage sortedInnerStorage; + private TestHashIndexStorage hashInnerStorage; + private TestMvPartitionStorage storage; + private StorageUpdateHandler storageUpdateHandler; + private IndexUpdateHandler indexUpdateHandler; + private LockByRowId lock; - GcUpdateHandler gcUpdateHandler; - - public static UUID getTxId() { - return TX_ID; - } + @InjectConfiguration + private StorageUpdateConfiguration storageUpdateConfiguration; @BeforeEach void setUp() { @@ -130,7 +131,8 @@ public abstract class IndexBaseTest extends BaseMvStoragesTest { USER_INDEX_BINARY_TUPLE_CONVERTER ); - storage = new TestMvPartitionStorage(PARTITION_ID); + lock = spy(new LockByRowId()); + storage = spy(new TestMvPartitionStorage(PARTITION_ID, lock)); Map<Integer, TableSchemaAwareIndexStorage> indexes = Map.of( pkIndexId, pkStorage, @@ -140,126 +142,71 @@ public abstract class IndexBaseTest extends BaseMvStoragesTest { TestPartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(tableId, PARTITION_ID, storage); - IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(DummyInternalTableImpl.createTableIndexStoragesSupplier(indexes)); - - gcUpdateHandler = new GcUpdateHandler( - partitionDataStorage, - new PendingComparableValuesTracker<>(HybridTimestamp.MAX_VALUE), - indexUpdateHandler - ); + indexUpdateHandler = spy(new IndexUpdateHandler(DummyInternalTableImpl.createTableIndexStoragesSupplier(indexes))); storageUpdateHandler = new StorageUpdateHandler( PARTITION_ID, partitionDataStorage, - indexUpdateHandler + indexUpdateHandler, + storageUpdateConfiguration ); } - List<ReadResult> getRowVersions(RowId rowId) { - try (Cursor<ReadResult> readResults = storage.scanVersions(rowId)) { - return readResults.stream().collect(toList()); - } - } + @Test + void testUpdateAllBatchedTryLockFailed() { - static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable BinaryRow row) { - addWrite(handler, rowUuid, row, null); - } + UUID txUuid = UUID.randomUUID(); + + HybridTimestamp commitTs = CLOCK.now(); + + BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, "bar")); + BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, "baz")); + BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7, "zzu")); - static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable BinaryRow row, @Nullable HybridTimestamp lastCommitTime) { TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID); - handler.handleUpdate(TX_ID, rowUuid, partitionId, row, false, null, null, lastCommitTime); - } + TimedBinaryRow tb1 = new TimedBinaryRow(row1, null); + TimedBinaryRow tb2 = new TimedBinaryRow(row2, null); + TimedBinaryRow tb3 = new TimedBinaryRow(row3, null); - static BinaryRow defaultRow() { - var key = new TestKey(1, "foo"); - var value = new TestValue(2, "bar"); + UUID id1 = UUID.randomUUID(); + UUID id2 = UUID.randomUUID(); + UUID id3 = UUID.randomUUID(); - return binaryRow(key, value); - } + Map<UUID, TimedBinaryRow> rowsToUpdate = Map.of( + id1, tb1, + id2, tb2, + id3, tb3 + ); - boolean inAllIndexes(BinaryRow row) { - return inIndexes(row, true, true); - } + doReturn(false).when(lock).tryLock(any()); - boolean notInAnyIndex(BinaryRow row) { - return inIndexes(row, false, false); - } + storageUpdateHandler.handleUpdateAll(txUuid, rowsToUpdate, partitionId, true, null, null); - boolean inIndexes(BinaryRow row, boolean mustBeInPk, boolean mustBeInUser) { - BinaryTuple pkIndexValue = PK_INDEX_BINARY_TUPLE_CONVERTER.extractColumns(row); - BinaryTuple userIndexValue = USER_INDEX_BINARY_TUPLE_CONVERTER.extractColumns(row); + assertEquals(3, storage.rowsCount()); - assert pkIndexValue != null; - assert userIndexValue != null; + // We have three writes to the storage. + verify(storage, times(3)).addWrite(any(), any(), any(), anyInt(), anyInt()); - try (Cursor<RowId> pkCursor = pkInnerStorage.get(pkIndexValue)) { - if (pkCursor.hasNext() != mustBeInPk) { - return false; - } - } + // First entry calls lock(). Second calls tryLock and fails, starts second batch, calls lock(). Same for third. + verify(lock, times(2)).tryLock(any()); + verify(lock, times(3)).lock(any()); - try (Cursor<RowId> sortedIdxCursor = sortedInnerStorage.get(userIndexValue)) { - if (sortedIdxCursor.hasNext() != mustBeInUser) { - return false; - } - } + storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs); - try (Cursor<RowId> hashIdxCursor = hashInnerStorage.get(userIndexValue)) { - return hashIdxCursor.hasNext() == mustBeInUser; - } - } + assertEquals(3, storage.rowsCount()); - HybridTimestamp now() { - return clock.now(); - } + // Those writes resulted in three commits. + verify(storage, times(3)).commitWrite(any(), any()); - void commitWrite(RowId rowId) { - storage.runConsistently(locker -> { - storage.commitWrite(rowId, now()); + ReadResult result1 = storage.read(new RowId(partitionId.partitionId(), id1), HybridTimestamp.MAX_VALUE); + assertEquals(row1, result1.binaryRow()); - return null; - }); - } + ReadResult result2 = storage.read(new RowId(partitionId.partitionId(), id2), HybridTimestamp.MAX_VALUE); + assertEquals(row2, result2.binaryRow()); - /** Enum that encapsulates update API. */ - enum AddWrite { - /** Uses update api. */ - USE_UPDATE { - @Override - void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable BinaryRow row) { - // TODO: perhaps need to pass last commit time as a param - handler.handleUpdate(TX_ID, rowUuid, partitionId, row, true, null, null, null); - } - }, - /** Uses updateAll api. */ - USE_UPDATE_ALL { - @Override - void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable BinaryRow row) { - BinaryRowMessage rowMessage = row == null - ? null - : MSG_FACTORY.binaryRowMessage() - .binaryTuple(row.tupleSlice()) - .schemaVersion(row.schemaVersion()) - .build(); - - handler.handleUpdateAll( - TX_ID, - singletonMap(rowUuid, new TimedBinaryRow(rowMessage == null ? null : rowMessage.asBinaryRow(), null)), - partitionId, - true, - null, - null - ); - } - }; - - void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable BinaryRow row) { - TablePartitionId tablePartitionId = new TablePartitionId(444, PARTITION_ID); - - addWrite(handler, tablePartitionId, rowUuid, row); - } - - abstract void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable BinaryRow row); + ReadResult result3 = storage.read(new RowId(partitionId.partitionId(), id3), HybridTimestamp.MAX_VALUE); + assertEquals(row3, result3.binaryRow()); } + } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index 1ac057fc18..2ad55251e9 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -95,6 +95,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaManager; import org.apache.ignite.internal.schema.SchemaUtils; import org.apache.ignite.internal.schema.configuration.GcConfiguration; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.storage.DataStorageModules; import org.apache.ignite.internal.storage.MvPartitionStorage; @@ -196,6 +197,10 @@ public class TableManagerTest extends IgniteAbstractTest { @InjectConfiguration private GcConfiguration gcConfig; + /** Storage update configuration. */ + @InjectConfiguration + private StorageUpdateConfiguration storageUpdateConfiguration; + @InjectConfiguration private PersistentPageMemoryStorageEngineConfiguration storageEngineConfig; @@ -705,6 +710,7 @@ public class TableManagerTest extends IgniteAbstractTest { NODE_NAME, revisionUpdater, gcConfig, + storageUpdateConfiguration, clusterService, rm, replicaMgr, diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java index dc4ff2bfbf..c086498142 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.mock; import java.nio.file.Path; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.components.LongJvmPauseDetector; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.pagememory.io.PageIoRegistry; import org.apache.ignite.internal.storage.engine.StorageTableDescriptor; @@ -40,7 +39,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(WorkDirectoryExtension.class) -@ExtendWith(ConfigurationExtension.class) class PersistentPageMemoryGcUpdateHandlerTest extends AbstractGcUpdateHandlerTest { @WorkDirectory private Path workDir; diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java index aa96468c1b..8541671597 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.mock; import java.nio.file.Path; import org.apache.ignite.internal.catalog.CatalogService; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.storage.engine.StorageTableDescriptor; import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier; @@ -38,7 +37,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(WorkDirectoryExtension.class) -@ExtendWith(ConfigurationExtension.class) class RocksDbGcUpdateHandlerTest extends AbstractGcUpdateHandlerTest { @WorkDirectory private Path workDir; diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/VolatilePageMemoryGcUpdateHandlerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/VolatilePageMemoryGcUpdateHandlerTest.java index 630e7c77a8..8b4347974d 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/VolatilePageMemoryGcUpdateHandlerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/VolatilePageMemoryGcUpdateHandlerTest.java @@ -22,7 +22,6 @@ import static org.apache.ignite.internal.storage.pagememory.configuration.schema import static org.mockito.Mockito.mock; import org.apache.ignite.internal.catalog.CatalogService; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp; import org.apache.ignite.internal.pagememory.io.PageIoRegistry; @@ -34,9 +33,7 @@ import org.apache.ignite.internal.storage.pagememory.configuration.schema.Volati import org.apache.ignite.internal.util.IgniteUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.extension.ExtendWith; -@ExtendWith(ConfigurationExtension.class) class VolatilePageMemoryGcUpdateHandlerTest extends AbstractGcUpdateHandlerTest { private VolatilePageMemoryStorageEngine engine; diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java index e278eb241f..1d322d3958 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java @@ -54,6 +54,8 @@ import java.util.stream.Stream; import org.apache.ignite.distributed.TestPartitionDataStorage; import org.apache.ignite.internal.TestHybridClock; import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -71,6 +73,7 @@ import org.apache.ignite.internal.schema.BinaryRowConverter; import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.schema.row.Row; import org.apache.ignite.internal.schema.row.RowAssembler; import org.apache.ignite.internal.storage.MvPartitionStorage; @@ -120,6 +123,7 @@ import org.mockito.junit.jupiter.MockitoExtension; */ @ExtendWith(WorkDirectoryExtension.class) @ExtendWith(MockitoExtension.class) +@ExtendWith(ConfigurationExtension.class) public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { private static final int KEY_COUNT = 100; @@ -170,6 +174,9 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { private StorageUpdateHandler storageUpdateHandler; + @InjectConfiguration + private StorageUpdateConfiguration storageUpdateConfiguration; + /** * Initializes a table listener before tests. */ @@ -190,7 +197,8 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { storageUpdateHandler = spy(new StorageUpdateHandler( PARTITION_ID, partitionDataStorage, - indexUpdateHandler + indexUpdateHandler, + storageUpdateConfiguration )); commandListener = new PartitionListener( @@ -288,7 +296,8 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler( PARTITION_ID, partitionDataStorage, - indexUpdateHandler + indexUpdateHandler, + storageUpdateConfiguration ); PartitionListener testCommandListener = new PartitionListener( diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java index e04b1b157e..6401d2977a 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java @@ -47,6 +47,8 @@ import java.util.function.Function; import org.apache.ignite.distributed.TestPartitionDataStorage; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -62,6 +64,7 @@ import org.apache.ignite.internal.schema.BinaryTupleSchema; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.ColumnsExtractor; import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.schema.marshaller.KvMarshaller; import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory; import org.apache.ignite.internal.storage.RowId; @@ -108,10 +111,12 @@ import org.hamcrest.CustomMatcher; import org.hamcrest.Matcher; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; /** There are tests for partition replica listener. */ +@ExtendWith(ConfigurationExtension.class) public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest { private static final int PART_ID = 0; private static final int TABLE_ID = 1; @@ -132,6 +137,9 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest private static ColumnsExtractor row2HashKeyConverter; private static ColumnsExtractor row2SortKeyConverter; + @InjectConfiguration + private static StorageUpdateConfiguration storageUpdateConfiguration; + @BeforeAll public static void beforeAll() { RaftGroupService mockRaftClient = mock(RaftGroupService.class); @@ -235,7 +243,8 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest new StorageUpdateHandler( PART_ID, partitionDataStorage, - indexUpdateHandler + indexUpdateHandler, + storageUpdateConfiguration ), new DummyValidationSchemasSource(schemaManager), localNode, diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index c23090f054..7d3a3700d8 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -87,6 +87,8 @@ import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.commands.DefaultValue; import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -109,6 +111,7 @@ import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.ColumnsExtractor; import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.schema.marshaller.KvMarshaller; import org.apache.ignite.internal.schema.marshaller.MarshallerFactory; import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory; @@ -211,6 +214,7 @@ import org.mockito.quality.Strictness; /** Tests for partition replica listener. */ @ExtendWith(MockitoExtension.class) +@ExtendWith(ConfigurationExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) public class PartitionReplicaListenerTest extends IgniteAbstractTest { private static final int PART_ID = 0; @@ -319,6 +323,9 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { @Mock private MessagingService messagingService; + @InjectConfiguration + private StorageUpdateConfiguration storageUpdateConfiguration; + /** Schema descriptor for tests. */ private SchemaDescriptor schemaDescriptor; @@ -521,7 +528,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { new StorageUpdateHandler( PART_ID, partitionDataStorage, - indexUpdateHandler + indexUpdateHandler, + storageUpdateConfiguration ), validationSchemasSource, localNode, diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index c0360eb43e..879d61e839 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -88,6 +88,7 @@ import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.BinaryRowConverter; import org.apache.ignite.internal.schema.ColumnsExtractor; import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage; @@ -162,6 +163,8 @@ public class ItTxTestCluster { private final TransactionConfiguration txConfiguration; + private final StorageUpdateConfiguration storageUpdateConfiguration; + private final Path workDir; private final int nodes; @@ -259,6 +262,7 @@ public class ItTxTestCluster { TestInfo testInfo, RaftConfiguration raftConfig, TransactionConfiguration txConfiguration, + StorageUpdateConfiguration storageUpdateConfiguration, Path workDir, int nodes, int replicas, @@ -267,6 +271,7 @@ public class ItTxTestCluster { ) { this.raftConfig = raftConfig; this.txConfiguration = txConfiguration; + this.storageUpdateConfiguration = storageUpdateConfiguration; this.workDir = workDir; this.nodes = nodes; this.replicas = replicas; @@ -532,7 +537,8 @@ public class ItTxTestCluster { StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler( partId, partitionDataStorage, - indexUpdateHandler + indexUpdateHandler, + storageUpdateConfiguration ); TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index 67cb03f376..6dc7280247 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.schema.BinaryRowEx; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.ColumnsExtractor; import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage; @@ -136,6 +137,7 @@ public class DummyInternalTableImpl extends InternalTableImpl { ); private static final ReplicationGroupId crossTableGroupId = new TablePartitionId(333, 0); + private final StorageUpdateConfiguration storageUpdateConfiguration; private PartitionListener partitionListener; @@ -156,9 +158,16 @@ public class DummyInternalTableImpl extends InternalTableImpl { * @param replicaSvc Replica service. * @param schema Schema. * @param txConfiguration Transaction configuration. + * @param storageUpdateConfiguration Configuration for the storage update handler. */ - public DummyInternalTableImpl(ReplicaService replicaSvc, SchemaDescriptor schema, TransactionConfiguration txConfiguration) { - this(replicaSvc, new TestMvPartitionStorage(0), schema, new TestPlacementDriver(LOCAL_NODE), txConfiguration); + public DummyInternalTableImpl( + ReplicaService replicaSvc, + SchemaDescriptor schema, + TransactionConfiguration txConfiguration, + StorageUpdateConfiguration storageUpdateConfiguration + ) { + this(replicaSvc, new TestMvPartitionStorage(0), schema, new TestPlacementDriver(LOCAL_NODE), + txConfiguration, storageUpdateConfiguration); } /** @@ -168,14 +177,16 @@ public class DummyInternalTableImpl extends InternalTableImpl { * @param storage Storage. * @param schema Schema. * @param txConfiguration Transaction configuration. + * @param storageUpdateConfiguration Configuration for the storage update handler. */ public DummyInternalTableImpl( ReplicaService replicaSvc, MvPartitionStorage storage, SchemaDescriptor schema, - TransactionConfiguration txConfiguration + TransactionConfiguration txConfiguration, + StorageUpdateConfiguration storageUpdateConfiguration ) { - this(replicaSvc, storage, schema, new TestPlacementDriver(LOCAL_NODE), txConfiguration); + this(replicaSvc, storage, schema, new TestPlacementDriver(LOCAL_NODE), txConfiguration, storageUpdateConfiguration); } /** @@ -185,13 +196,15 @@ public class DummyInternalTableImpl extends InternalTableImpl { * @param mvPartStorage Multi version partition storage. * @param schema Schema descriptor. * @param txConfiguration Transaction configuration. + * @param storageUpdateConfiguration Configuration for the storage update handler. */ public DummyInternalTableImpl( ReplicaService replicaSvc, MvPartitionStorage mvPartStorage, SchemaDescriptor schema, PlacementDriver placementDriver, - TransactionConfiguration txConfiguration + TransactionConfiguration txConfiguration, + StorageUpdateConfiguration storageUpdateConfiguration ) { this( replicaSvc, @@ -201,7 +214,8 @@ public class DummyInternalTableImpl extends InternalTableImpl { null, schema, new HybridTimestampTracker(), - placementDriver + placementDriver, + storageUpdateConfiguration ); } @@ -217,6 +231,7 @@ public class DummyInternalTableImpl extends InternalTableImpl { * @param schema Schema descriptor. * @param tracker Observable timestamp tracker. * @param placementDriver Placement driver. + * @param storageUpdateConfiguration Configuration for the storage config handler. */ public DummyInternalTableImpl( ReplicaService replicaSvc, @@ -226,7 +241,8 @@ public class DummyInternalTableImpl extends InternalTableImpl { @Nullable TransactionStateResolver transactionStateResolver, SchemaDescriptor schema, HybridTimestampTracker tracker, - PlacementDriver placementDriver + PlacementDriver placementDriver, + StorageUpdateConfiguration storageUpdateConfiguration ) { super( "test", @@ -242,6 +258,7 @@ public class DummyInternalTableImpl extends InternalTableImpl { tracker, placementDriver ); + this.storageUpdateConfiguration = storageUpdateConfiguration; RaftGroupService svc = raftGroupServiceByPartitionId.get(PART_ID); groupId = crossTableUsage ? new TablePartitionId(tableId(), PART_ID) : crossTableGroupId; @@ -347,7 +364,8 @@ public class DummyInternalTableImpl extends InternalTableImpl { StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler( PART_ID, partitionDataStorage, - indexUpdateHandler + indexUpdateHandler, + storageUpdateConfiguration ); DummySchemaManagerImpl schemaManager = new DummySchemaManagerImpl(schema);