This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new d7970190a4 IGNITE-20467 Use replica message instead of directly using raft client when building indexes (#2630) d7970190a4 is described below commit d7970190a48b999d0adddfd5906d691b050576d2 Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Tue Sep 26 15:22:28 2023 +0300 IGNITE-20467 Use replica message instead of directly using raft client when building indexes (#2630) --- .../internal/lang/IgniteInternalException.java | 4 +- .../apache/ignite/internal/util/IgniteUtils.java | 20 +++++++++ .../exception/PrimaryReplicaMissException.java | 42 ++++++++++--------- .../internal/sql/engine/ItBuildIndexTest.java | 16 ++++--- .../ignite/distributed/ItTablePersistenceTest.java | 12 ++++-- .../internal/table/distributed/TableManager.java | 2 +- .../table/distributed/TableMessageGroup.java | 6 +++ .../distributed/command/BuildIndexCommand.java | 21 ++-------- .../table/distributed/index/IndexBuildTask.java | 46 +++++++++----------- .../table/distributed/index/IndexBuilder.java | 49 ++++++++++++---------- .../distributed/raft/PartitionDataStorage.java | 8 +++- .../table/distributed/raft/PartitionListener.java | 2 +- .../SnapshotAwarePartitionDataStorage.java | 10 +++++ .../request/BuildIndexReplicaRequest.java} | 29 ++++--------- .../replicator/PartitionReplicaListener.java | 27 +++++++++++- .../internal/table/distributed/IndexBaseTest.java | 9 ++-- .../distributed/StorageUpdateHandlerTest.java | 10 +++-- .../gc/AbstractGcUpdateHandlerTest.java | 4 +- .../PersistentPageMemoryGcUpdateHandlerTest.java | 2 +- .../distributed/gc/RocksDbGcUpdateHandlerTest.java | 2 +- .../distributed/gc/TestGcUpdateHandlerTest.java | 2 +- .../gc/VolatilePageMemoryGcUpdateHandlerTest.java | 2 +- .../raft/PartitionCommandListenerTest.java | 33 ++++----------- .../PartitionReplicaListenerIndexLockingTest.java | 2 +- .../replication/PartitionReplicaListenerTest.java | 2 +- .../apache/ignite/distributed/ItTxTestCluster.java | 2 +- .../distributed/TestPartitionDataStorage.java | 23 +++++++++- .../table/impl/DummyInternalTableImpl.java | 4 +- 28 files changed, 223 insertions(+), 168 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteInternalException.java b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteInternalException.java index e7a0c18089..816edd19a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteInternalException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteInternalException.java @@ -207,11 +207,11 @@ public class IgniteInternalException extends RuntimeException implements Traceab * * @param code Full error code. * @param messagePattern Error message pattern. - * @param cause Non-null throwable cause. + * @param cause Throwable cause. * @param params Error message params. * @see IgniteStringFormatter#format(String, Object...) */ - public IgniteInternalException(int code, String messagePattern, Throwable cause, Object... params) { + public IgniteInternalException(int code, String messagePattern, @Nullable Throwable cause, Object... params) { this(code, IgniteStringFormatter.format(messagePattern, params), cause); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 321428356f..cf83a563b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -777,6 +777,7 @@ public class IgniteUtils { * @param fn Function to run. * @param <T> Type of returned value from {@code fn}. * @return Result of the provided function. + * @throws IgniteInternalException with cause {@link NodeStoppingException} if {@link IgniteSpinBusyLock#enterBusy()} failed. */ public static <T> T inBusyLock(IgniteSpinBusyLock busyLock, Supplier<T> fn) { if (!busyLock.enterBusy()) { @@ -794,6 +795,7 @@ public class IgniteUtils { * * @param busyLock Component's busy lock. * @param fn Runnable to run. + * @throws IgniteInternalException with cause {@link NodeStoppingException} if {@link IgniteSpinBusyLock#enterBusy()} failed. */ public static void inBusyLock(IgniteSpinBusyLock busyLock, Runnable fn) { if (!busyLock.enterBusy()) { @@ -829,6 +831,24 @@ public class IgniteUtils { } } + /** + * Method that runs the provided {@code fn} in {@code busyLock} if {@link IgniteSpinBusyLock#enterBusy()} succeed. Otherwise it just + * silently returns. + * + * @param busyLock Component's busy lock. + * @param fn Runnable to run. + */ + public static void inBusyLockSafe(IgniteSpinBusyLock busyLock, Runnable fn) { + if (!busyLock.enterBusy()) { + return; + } + try { + fn.run(); + } finally { + busyLock.leaveBusy(); + } + } + /** * Collects all the fields of given class which are defined as a public static within the specified class. * diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java index b63eac2425..25cf994173 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java @@ -17,22 +17,24 @@ package org.apache.ignite.internal.replicator.exception; -import java.util.UUID; +import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR; + import org.apache.ignite.internal.lang.IgniteInternalException; -import org.apache.ignite.internal.lang.IgniteStringFormatter; -import org.apache.ignite.lang.ErrorGroups.Replicator; +import org.jetbrains.annotations.Nullable; /** * Unchecked exception that is thrown when a replica is not the current primary replica. */ public class PrimaryReplicaMissException extends IgniteInternalException { + private static final long serialVersionUID = 8755220779942651494L; + /** * The constructor. * * @param expectedPrimaryReplicaTerm Expected term from. * @param currentPrimaryReplicaTerm Current raft term. */ - public PrimaryReplicaMissException(Long expectedPrimaryReplicaTerm, long currentPrimaryReplicaTerm) { + public PrimaryReplicaMissException(long expectedPrimaryReplicaTerm, long currentPrimaryReplicaTerm) { this(expectedPrimaryReplicaTerm, currentPrimaryReplicaTerm, null); } @@ -43,25 +45,27 @@ public class PrimaryReplicaMissException extends IgniteInternalException { * @param currentPrimaryReplicaTerm Current raft term. * @param cause Cause exception. */ - public PrimaryReplicaMissException(Long expectedPrimaryReplicaTerm, long currentPrimaryReplicaTerm, Throwable cause) { - super(Replicator.REPLICA_MISS_ERR, - IgniteStringFormatter.format( - "The primary replica has changed because the term has been changed " - + "[expectedPrimaryReplicaTerm={}, currentPrimaryReplicaTerm={}]", - expectedPrimaryReplicaTerm, currentPrimaryReplicaTerm - ), - cause); + public PrimaryReplicaMissException(long expectedPrimaryReplicaTerm, long currentPrimaryReplicaTerm, @Nullable Throwable cause) { + super( + REPLICA_MISS_ERR, + "The primary replica has changed because the term has been changed " + + "[expectedPrimaryReplicaTerm={}, currentPrimaryReplicaTerm={}]", + cause, + expectedPrimaryReplicaTerm, currentPrimaryReplicaTerm + ); } /** - * The constructor is used for creating an exception instance that is thrown from a remote server. + * The constructor. * - * @param traceId Trace id. - * @param code Error code. - * @param message Error message. - * @param cause Cause exception. + * @param expectedLeaseholder Expected leaseholder. + * @param currentLeaseholder Current leaseholder. */ - public PrimaryReplicaMissException(UUID traceId, int code, String message, Throwable cause) { - super(traceId, code, message, cause); + public PrimaryReplicaMissException(String expectedLeaseholder, String currentLeaseholder) { + super( + REPLICA_MISS_ERR, + "The primary replica has changed [expectedLeaseholder={}, currentLeaseholder={}]", + expectedLeaseholder, currentLeaseholder + ); } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java index 4dfb279310..60aa23c679 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java @@ -56,16 +56,13 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -/** - * Integration test of index building. - */ -@Disabled("https://issues.apache.org/jira/browse/IGNITE-20096") +/** Integration test of index building. */ public class ItBuildIndexTest extends ClusterPerClassIntegrationTest { - private static final String ZONE_NAME = "zone_table"; + private static final String ZONE_NAME = "ZONE_TABLE"; - private static final String TABLE_NAME = "test_table"; + private static final String TABLE_NAME = "TEST_TABLE"; - private static final String INDEX_NAME = "test_index"; + private static final String INDEX_NAME = "TEST_INDEX"; @AfterEach void tearDown() { @@ -89,7 +86,7 @@ public class ItBuildIndexTest extends ClusterPerClassIntegrationTest { checkIndexBuild(partitions, replicas, INDEX_NAME); assertQuery(IgniteStringFormatter.format("SELECT * FROM {} WHERE i1 > 0", TABLE_NAME)) - .matches(containsIndexScan("PUBLIC", TABLE_NAME.toUpperCase(), INDEX_NAME.toUpperCase())) + .matches(containsIndexScan("PUBLIC", TABLE_NAME, INDEX_NAME)) .returns(1, 1) .returns(2, 2) .returns(3, 3) @@ -99,6 +96,7 @@ public class ItBuildIndexTest extends ClusterPerClassIntegrationTest { } @Test + @Disabled("https://issues.apache.org/jira/browse/IGNITE-20330") void testChangePrimaryReplicaOnMiddleBuildIndex() throws Exception { prepareBuildIndexToChangePrimaryReplica(); @@ -174,7 +172,7 @@ public class ItBuildIndexTest extends ClusterPerClassIntegrationTest { sql(IgniteStringFormatter.format( "CREATE TABLE {} (i0 INTEGER PRIMARY KEY, i1 INTEGER) WITH PRIMARY_ZONE='{}'", - TABLE_NAME, ZONE_NAME.toUpperCase() + TABLE_NAME, ZONE_NAME )); sql(IgniteStringFormatter.format( diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java index d7752fd878..9c25269b54 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java @@ -410,18 +410,22 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti RocksDbStorageEngine storageEngine = new RocksDbStorageEngine("test", engineConfig, path); storageEngine.start(); + int tableId = 1; + MvTableStorage mvTableStorage = storageEngine.createMvTable( - new StorageTableDescriptor(1, 1, DEFAULT_DATA_REGION_NAME), + new StorageTableDescriptor(tableId, 1, DEFAULT_DATA_REGION_NAME), new StorageIndexDescriptorSupplier(mock(CatalogService.class)) ); mvTableStorage.start(); mvTableStorages.put(index, mvTableStorage); - MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(mvTableStorage, 0); + int partitionId = 0; + + MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(mvTableStorage, partitionId); mvPartitionStorages.put(index, mvPartitionStorage); - PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(mvPartitionStorage); + PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(tableId, partitionId, mvPartitionStorage); PendingComparableValuesTracker<HybridTimestamp, Void> safeTime = new PendingComparableValuesTracker<>( new HybridTimestamp(1, 0) @@ -432,7 +436,7 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti ); StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler( - 0, + partitionId, partitionDataStorage, gcConfig, mock(LowWatermark.class), diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 4d41718012..6830a03fab 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -488,7 +488,7 @@ public class TableManager extends AbstractEventProducer<TableEvent, TableEventPa lowWatermark = new LowWatermark(nodeName, gcConfig.lowWatermark(), clock, txManager, vaultManager, mvGc); - indexBuilder = new IndexBuilder(nodeName, cpus); + indexBuilder = new IndexBuilder(nodeName, cpus, replicaSvc); raftCommandsMarshaller = new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java index 2eff68eb00..78604d0c17 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse; import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage; +import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowPkReplicaRequest; @@ -161,8 +162,13 @@ public interface TableMessageGroup { */ short RW_MULTI_ROW_PK_REPLICA_REQUEST = 20; + /** Message type for {@link BuildIndexReplicaRequest}. */ + short BUILD_INDEX_REPLICA_REQUEST = 21; + /** * Message types for Table module RAFT commands. + * + * <p>NOTE: Commands must be immutable because they will be stored in the replication log.</p> */ interface Commands { /** Message type for {@link FinishTxCommand}. */ diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java index 8ebd616916..8e17d1d827 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java @@ -23,28 +23,15 @@ import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.network.annotations.Transferable; -/** - * State machine command to build a table index. - */ +/** State machine command to build a table index. */ @Transferable(TableMessageGroup.Commands.BUILD_INDEX) public interface BuildIndexCommand extends WriteCommand { - /** - * Returns ID of table partition. - */ - TablePartitionIdMessage tablePartitionId(); - - /** - * Returns index ID. - */ + /** Returns index ID. */ int indexId(); - /** - * Returns row IDs for which to build indexes. - */ + /** Returns row IDs for which to build indexes. */ List<UUID> rowIds(); - /** - * Returns {@code true} if this batch is the last one. - */ + /** Returns {@code true} if this batch is the last one. */ boolean finish(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java index 71098c2b31..a8a1c8fb8c 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java @@ -31,17 +31,17 @@ import java.util.function.Function; import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; -import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.index.IndexStorage; import org.apache.ignite.internal.table.distributed.TableMessagesFactory; -import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand; +import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest; import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.network.ClusterNode; -/** - * Task of building a table index. - */ +/** Task of building a table index. */ class IndexBuildTask { private static final IgniteLogger LOG = Loggers.forClass(IndexBuildTask.class); @@ -53,7 +53,7 @@ class IndexBuildTask { private final MvPartitionStorage partitionStorage; - private final RaftGroupService raftClient; + private final ReplicaService replicaService; private final ExecutorService executor; @@ -61,6 +61,8 @@ class IndexBuildTask { private final int batchSize; + private final ClusterNode node; + private final IgniteSpinBusyLock taskBusyLock = new IgniteSpinBusyLock(); private final AtomicBoolean taskStopGuard = new AtomicBoolean(); @@ -71,23 +73,23 @@ class IndexBuildTask { IndexBuildTaskId taskId, IndexStorage indexStorage, MvPartitionStorage partitionStorage, - RaftGroupService raftClient, + ReplicaService replicaService, ExecutorService executor, IgniteSpinBusyLock busyLock, - int batchSize + int batchSize, + ClusterNode node ) { this.taskId = taskId; this.indexStorage = indexStorage; this.partitionStorage = partitionStorage; - this.raftClient = raftClient; + this.replicaService = replicaService; this.executor = executor; this.busyLock = busyLock; this.batchSize = batchSize; + this.node = node; } - /** - * Starts building the index. - */ + /** Starts building the index. */ void start() { if (!enterBusy()) { taskFuture.complete(null); @@ -120,9 +122,7 @@ class IndexBuildTask { } } - /** - * Stops index building. - */ + /** Stops index building. */ void stop() { if (!taskStopGuard.compareAndSet(false, true)) { return; @@ -131,9 +131,7 @@ class IndexBuildTask { taskBusyLock.block(); } - /** - * Returns the index build future. - */ + /** Returns the index build future. */ CompletableFuture<Void> getTaskFuture() { return taskFuture; } @@ -146,7 +144,7 @@ class IndexBuildTask { try { List<RowId> batchRowIds = createBatchRowIds(); - return raftClient.run(createBuildIndexCommand(batchRowIds)) + return replicaService.invoke(node, createBuildIndexReplicaRequest(batchRowIds)) .thenComposeAsync(unused -> { if (indexStorage.getNextRowIdToBuild() == null) { // Index has been built. @@ -182,15 +180,11 @@ class IndexBuildTask { return batch; } - private BuildIndexCommand createBuildIndexCommand(List<RowId> rowIds) { + private BuildIndexReplicaRequest createBuildIndexReplicaRequest(List<RowId> rowIds) { boolean finish = rowIds.size() < batchSize; - return TABLE_MESSAGES_FACTORY.buildIndexCommand() - .tablePartitionId(TABLE_MESSAGES_FACTORY.tablePartitionIdMessage() - .tableId(taskId.getTableId()) - .partitionId(taskId.getPartitionId()) - .build() - ) + return TABLE_MESSAGES_FACTORY.buildIndexReplicaRequest() + .groupId(new TablePartitionId(taskId.getTableId(), taskId.getPartitionId())) .indexId(taskId.getIndexId()) .rowIds(rowIds.stream().map(RowId::uuid).collect(toList())) .finish(finish) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java index 150b7edf08..8e23f1a956 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.table.distributed.index; +import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe; + import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; @@ -29,13 +31,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; -import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.index.IndexStorage; import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand; +import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.network.ClusterNode; /** * Class for managing the building of table indexes. @@ -47,6 +51,8 @@ public class IndexBuilder implements ManuallyCloseable { private final ExecutorService executor; + private final ReplicaService replicaService; + private final Map<IndexBuildTaskId, IndexBuildTask> indexBuildTaskById = new ConcurrentHashMap<>(); private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); @@ -58,8 +64,11 @@ public class IndexBuilder implements ManuallyCloseable { * * @param nodeName Node name. * @param threadCount Number of threads to build indexes. + * @param replicaService Replica service. */ - public IndexBuilder(String nodeName, int threadCount) { + public IndexBuilder(String nodeName, int threadCount, ReplicaService replicaService) { + this.replicaService = replicaService; + executor = new ThreadPoolExecutor( threadCount, threadCount, @@ -73,16 +82,17 @@ public class IndexBuilder implements ManuallyCloseable { /** * Starts building the index if it is not already built or is not yet in progress. * - * <p>Index is built in batches using {@link BuildIndexCommand} (via raft), batches are sent sequentially. + * <p>Index is built in batches using {@link BuildIndexReplicaRequest}, which are then transformed into {@link BuildIndexCommand} on the + * replica, batches are sent sequentially.</p> * - * <p>It is expected that the index building is triggered by the leader of the raft group. + * <p>It is expected that the index building is triggered by the primary replica.</p> * * @param tableId Table ID. * @param partitionId Partition ID. * @param indexId Index ID. * @param indexStorage Index storage to build. * @param partitionStorage Multi-versioned partition storage. - * @param raftClient Raft client. + * @param node Node to which requests to build the index will be sent. */ // TODO: IGNITE-19498 Perhaps we need to start building the index only once public void startBuildIndex( @@ -91,16 +101,25 @@ public class IndexBuilder implements ManuallyCloseable { int indexId, IndexStorage indexStorage, MvPartitionStorage partitionStorage, - RaftGroupService raftClient + ClusterNode node ) { - inBusyLock(() -> { + inBusyLockSafe(busyLock, () -> { if (indexStorage.getNextRowIdToBuild() == null) { return; } IndexBuildTaskId taskId = new IndexBuildTaskId(tableId, partitionId, indexId); - IndexBuildTask newTask = new IndexBuildTask(taskId, indexStorage, partitionStorage, raftClient, executor, busyLock, BATCH_SIZE); + IndexBuildTask newTask = new IndexBuildTask( + taskId, + indexStorage, + partitionStorage, + replicaService, + executor, + busyLock, + BATCH_SIZE, + node + ); IndexBuildTask previousTask = indexBuildTaskById.putIfAbsent(taskId, newTask); @@ -123,7 +142,7 @@ public class IndexBuilder implements ManuallyCloseable { * @param indexId Index ID. */ public void stopBuildIndex(int tableId, int partitionId, int indexId) { - inBusyLock(() -> { + inBusyLockSafe(busyLock, () -> { IndexBuildTask removed = indexBuildTaskById.remove(new IndexBuildTaskId(tableId, partitionId, indexId)); if (removed != null) { @@ -170,16 +189,4 @@ public class IndexBuilder implements ManuallyCloseable { IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS); } - - private void inBusyLock(Runnable runnable) { - if (!busyLock.enterBusy()) { - return; - } - - try { - runnable.run(); - } finally { - busyLock.leaveBusy(); - } - } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java index 945e414282..9b8bdfc4cb 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java @@ -35,7 +35,7 @@ import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; /** - * Provides access to MV (multi-version) data of a partition. + * Provides access to MV (multi-version) data of a table partition. * * <p>Methods writing to MV storage ({@link #addWrite(RowId, BinaryRow, UUID, int, int)}, {@link #abortWrite(RowId)} * and {@link #commitWrite(RowId, HybridTimestamp)}) and TX data storage MUST be invoked under a lock acquired using @@ -47,6 +47,12 @@ import org.jetbrains.annotations.TestOnly; * @see MvPartitionStorage */ public interface PartitionDataStorage extends ManuallyCloseable { + /** Returns table ID. */ + int tableId(); + + /** Returns partition ID. */ + int partitionId(); + /** * Executes {@link WriteClosure} atomically, meaning that partial result of an incomplete closure will never be written to the * physical device, thus guaranteeing data consistency after restart. Simply runs the closure in case of a volatile storage. diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 78ca782a92..13eba978e8 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -489,7 +489,7 @@ public class PartitionListener implements RaftGroupListener { if (cmd.finish()) { LOG.info( "Finish building the index: [tableId={}, partitionId={}, indexId={}]", - cmd.tablePartitionId().tableId(), cmd.tablePartitionId().partitionId(), cmd.indexId() + storage.tableId(), storage.partitionId(), cmd.indexId() ); } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java index f1dbcc4404..f46d0a01c0 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java @@ -61,6 +61,16 @@ public class SnapshotAwarePartitionDataStorage implements PartitionDataStorage { this.partitionKey = partitionKey; } + @Override + public int tableId() { + return partitionKey.tableId(); + } + + @Override + public int partitionId() { + return partitionKey.partitionId(); + } + @Override public <V> V runConsistently(WriteClosure<V> closure) throws StorageException { return partitionStorage.runConsistently(closure); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java similarity index 64% copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java index 8ebd616916..83e895f592 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java @@ -15,36 +15,23 @@ * limitations under the License. */ -package org.apache.ignite.internal.table.distributed.command; +package org.apache.ignite.internal.table.distributed.replication.request; import java.util.List; import java.util.UUID; -import org.apache.ignite.internal.raft.WriteCommand; +import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.network.annotations.Transferable; -/** - * State machine command to build a table index. - */ -@Transferable(TableMessageGroup.Commands.BUILD_INDEX) -public interface BuildIndexCommand extends WriteCommand { - /** - * Returns ID of table partition. - */ - TablePartitionIdMessage tablePartitionId(); - - /** - * Returns index ID. - */ +/** Replica request to build a table index. */ +@Transferable(TableMessageGroup.BUILD_INDEX_REPLICA_REQUEST) +public interface BuildIndexReplicaRequest extends ReplicaRequest { + /** Returns index ID. */ int indexId(); - /** - * Returns row IDs for which to build indexes. - */ + /** Returns row IDs for which to build indexes. */ List<UUID> rowIds(); - /** - * Returns {@code true} if this batch is the last one. - */ + /** Returns {@code true} if this batch is the last one. */ boolean finish(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index daf7223079..744518e1d1 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -108,6 +108,7 @@ import org.apache.ignite.internal.table.distributed.SortedIndexLocker; import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; import org.apache.ignite.internal.table.distributed.TableMessagesFactory; import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage; +import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand; import org.apache.ignite.internal.table.distributed.command.FinishTxCommandBuilder; import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage; import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand; @@ -117,6 +118,7 @@ import org.apache.ignite.internal.table.distributed.command.UpdateCommandBuilder import org.apache.ignite.internal.table.distributed.index.IndexBuilder; import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage; +import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.CommittableTxRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyReplicaRequest; @@ -406,6 +408,8 @@ public class PartitionReplicaListener implements ReplicaListener { return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request, isPrimary); } else if (request instanceof ReplicaSafeTimeSyncRequest) { return processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request, isPrimary); + } else if (request instanceof BuildIndexReplicaRequest) { + return raftClient.run(toBuildIndexCommand((BuildIndexReplicaRequest) request)); } else { throw new UnsupportedReplicaRequestException(request.getClass()); } @@ -2366,7 +2370,7 @@ public class PartitionReplicaListener implements ReplicaListener { * Ensure that the primary replica was not changed. * * @param request Replica request. - * @return Future. The result is not null only for {@link ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. + * @return Future. The result is not {@code null} only for {@link ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. */ private CompletableFuture<Boolean> ensureReplicaIsPrimary(ReplicaRequest request) { Long expectedTerm; @@ -2410,6 +2414,17 @@ public class PartitionReplicaListener implements ReplicaListener { } else if (request instanceof ReadOnlyReplicaRequest || request instanceof ReplicaSafeTimeSyncRequest) { return placementDriver.getPrimaryReplica(replicationGroupId, now) .thenApply(primaryReplica -> (primaryReplica != null && isLocalPeer(primaryReplica.getLeaseholder()))); + } else if (request instanceof BuildIndexReplicaRequest) { + // TODO: IGNITE-20330 Possibly replaced by placementDriver#getPrimaryReplica and should also be added to the documentation + // about PrimaryReplicaMissException + return placementDriver.awaitPrimaryReplica(replicationGroupId, now) + .thenCompose(replicaMeta -> { + if (isLocalPeer(replicaMeta.getLeaseholder())) { + return completedFuture(null); + } else { + return failedFuture(new PrimaryReplicaMissException(localNode.name(), replicaMeta.getLeaseholder())); + } + }); } else { return completedFuture(null); } @@ -2827,7 +2842,7 @@ public class PartitionReplicaListener implements ReplicaListener { // TODO: IGNITE-19112 We only need to create the index storage once IndexStorage indexStorage = mvTableStorage.getOrCreateIndex(partId(), indexDescriptor); - indexBuilder.startBuildIndex(tableId(), partId(), indexDescriptor.id(), indexStorage, mvDataStorage, raftClient); + indexBuilder.startBuildIndex(tableId(), partId(), indexDescriptor.id(), indexStorage, mvDataStorage, localNode); } private int partId() { @@ -2920,4 +2935,12 @@ public class PartitionReplicaListener implements ReplicaListener { return tableDescriptor; } + + private static BuildIndexCommand toBuildIndexCommand(BuildIndexReplicaRequest request) { + return MSG_FACTORY.buildIndexCommand() + .indexId(request.indexId()) + .rowIds(request.rowIds()) + .finish(request.finish()) + .build(); + } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java index ad2d717695..b604afee6b 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java @@ -97,9 +97,10 @@ public abstract class IndexBaseTest extends BaseMvStoragesTest { @BeforeEach void setUp(@InjectConfiguration GcConfiguration gcConfig) { - int pkIndexId = 1; - int sortedIndexId = 2; - int hashIndexId = 3; + int tableId = 1; + int pkIndexId = 2; + int sortedIndexId = 3; + int hashIndexId = 4; pkInnerStorage = new TestHashIndexStorage(PARTITION_ID, new StorageHashIndexDescriptor(pkIndexId, List.of( new StorageHashIndexColumnDescriptor("INTKEY", NativeTypes.INT32, false), @@ -142,7 +143,7 @@ public abstract class IndexBaseTest extends BaseMvStoragesTest { hashIndexId, hashIndexStorage ); - TestPartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(storage); + TestPartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(tableId, PARTITION_ID, storage); IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(DummyInternalTableImpl.createTableIndexStoragesSupplier(indexes)); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java index fa845fc6d1..22d981e0f4 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java @@ -48,6 +48,8 @@ import org.junit.jupiter.api.extension.ExtendWith; */ @ExtendWith(ConfigurationExtension.class) public class StorageUpdateHandlerTest extends BaseIgniteAbstractTest { + private static final int TABLE_ID = 1; + private static final int PARTITION_ID = 0; @InjectConfiguration @@ -110,7 +112,7 @@ public class StorageUpdateHandlerTest extends BaseIgniteAbstractTest { storageUpdateHandler.handleUpdate( UUID.randomUUID(), UUID.randomUUID(), - new TablePartitionId(1, PARTITION_ID), + new TablePartitionId(TABLE_ID, PARTITION_ID), null, false, null, @@ -133,7 +135,7 @@ public class StorageUpdateHandlerTest extends BaseIgniteAbstractTest { storageUpdateHandler.handleUpdateAll( UUID.randomUUID(), Map.of(), - new TablePartitionId(1, PARTITION_ID), + new TablePartitionId(TABLE_ID, PARTITION_ID), false, null, null @@ -156,7 +158,9 @@ public class StorageUpdateHandlerTest extends BaseIgniteAbstractTest { } private static PartitionDataStorage createPartitionDataStorage() { - PartitionDataStorage partitionStorage = spy(new TestPartitionDataStorage(new TestMvPartitionStorage(PARTITION_ID))); + PartitionDataStorage partitionStorage = spy( + new TestPartitionDataStorage(TABLE_ID, PARTITION_ID, new TestMvPartitionStorage(PARTITION_ID)) + ); return partitionStorage; } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java index 9fddeb950a..98ef38737e 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java @@ -54,6 +54,8 @@ abstract class AbstractGcUpdateHandlerTest extends BaseMvStoragesTest { /** To be used in a loop. {@link RepeatedTest} has a smaller failure rate due to recreating the storage every time. */ private static final int REPEATS = 1000; + protected static final int TABLE_ID = 1; + private static final int PARTITION_ID = 0; private MvTableStorage tableStorage; @@ -193,7 +195,7 @@ abstract class AbstractGcUpdateHandlerTest extends BaseMvStoragesTest { } private TestPartitionDataStorage createPartitionDataStorage() { - return new TestPartitionDataStorage(getOrCreateMvPartition(tableStorage, PARTITION_ID)); + return new TestPartitionDataStorage(TABLE_ID, PARTITION_ID, getOrCreateMvPartition(tableStorage, PARTITION_ID)); } private static IndexUpdateHandler createIndexUpdateHandler() { diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java index 17c9a60bfa..dc4ff2bfbf 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java @@ -70,7 +70,7 @@ class PersistentPageMemoryGcUpdateHandlerTest extends AbstractGcUpdateHandlerTes engine.start(); table = engine.createMvTable( - new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, DEFAULT_DATA_REGION_NAME), + new StorageTableDescriptor(TABLE_ID, DEFAULT_PARTITION_COUNT, DEFAULT_DATA_REGION_NAME), new StorageIndexDescriptorSupplier(mock(CatalogService.class)) ); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java index 20a3a28cbe..aa96468c1b 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java @@ -56,7 +56,7 @@ class RocksDbGcUpdateHandlerTest extends AbstractGcUpdateHandlerTest { engine.start(); table = engine.createMvTable( - new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, DEFAULT_DATA_REGION_NAME), + new StorageTableDescriptor(TABLE_ID, DEFAULT_PARTITION_COUNT, DEFAULT_DATA_REGION_NAME), new StorageIndexDescriptorSupplier(mock(CatalogService.class)) ); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/TestGcUpdateHandlerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/TestGcUpdateHandlerTest.java index 3fd4e21872..57ff84b89b 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/TestGcUpdateHandlerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/TestGcUpdateHandlerTest.java @@ -25,6 +25,6 @@ import org.junit.jupiter.api.BeforeEach; class TestGcUpdateHandlerTest extends AbstractGcUpdateHandlerTest { @BeforeEach void setUp() { - initialize(new TestMvTableStorage(1, DEFAULT_PARTITION_COUNT)); + initialize(new TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT)); } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/VolatilePageMemoryGcUpdateHandlerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/VolatilePageMemoryGcUpdateHandlerTest.java index d97b61e2b5..630e7c77a8 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/VolatilePageMemoryGcUpdateHandlerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/VolatilePageMemoryGcUpdateHandlerTest.java @@ -55,7 +55,7 @@ class VolatilePageMemoryGcUpdateHandlerTest extends AbstractGcUpdateHandlerTest engine.start(); table = engine.createMvTable( - new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, DEFAULT_DATA_REGION_NAME), + new StorageTableDescriptor(TABLE_ID, DEFAULT_PARTITION_COUNT, DEFAULT_DATA_REGION_NAME), new StorageIndexDescriptorSupplier(mock(CatalogService.class)) ); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java index c9aa2201c8..855808b23f 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java @@ -126,26 +126,22 @@ import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) @ExtendWith(ConfigurationExtension.class) public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { - /** Key count. */ private static final int KEY_COUNT = 100; - /** Partition id. */ + private static final int TABLE_ID = 1; + private static final int PARTITION_ID = 0; - /** Schema. */ private static final SchemaDescriptor SCHEMA = new SchemaDescriptor( 1, new Column[]{new Column("key", NativeTypes.INT32, false)}, new Column[]{new Column("value", NativeTypes.INT32, false)} ); - /** Table command listener. */ private PartitionListener commandListener; - /** RAFT index. */ private final AtomicLong raftIndex = new AtomicLong(); - /** Primary index. */ private final TableSchemaAwareIndexStorage pkStorage = new TableSchemaAwareIndexStorage( 1, new TestHashIndexStorage( @@ -155,28 +151,19 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { BinaryRowConverter.keyExtractor(SCHEMA) ); - /** Partition storage. */ private final MvPartitionStorage mvPartitionStorage = spy(new TestMvPartitionStorage(PARTITION_ID)); - private final PartitionDataStorage partitionDataStorage = spy(new TestPartitionDataStorage(mvPartitionStorage)); + private final PartitionDataStorage partitionDataStorage = spy(new TestPartitionDataStorage(TABLE_ID, PARTITION_ID, mvPartitionStorage)); - /** Transaction meta storage. */ private final TxStateStorage txStateStorage = spy(new TestTxStateStorage()); - /** Work directory. */ @WorkDirectory private Path workDir; - /** Factory for command messages. */ private final TableMessagesFactory msgFactory = new TableMessagesFactory(); - /** Factory for replica messages. */ - private final ReplicaMessagesFactory replicaMessagesFactory = new ReplicaMessagesFactory(); - - /** Hybrid clock. */ private final HybridClock hybridClock = new HybridClockImpl(); - /** Safe time tracker. */ private PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker; @Captor @@ -300,7 +287,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { */ @Test public void testOnSnapshotSavePropagateLastAppliedIndexAndTerm(@InjectConfiguration GcConfiguration gcConfig) { - TestPartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(mvPartitionStorage); + TestPartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(TABLE_ID, PARTITION_ID, mvPartitionStorage); IndexUpdateHandler indexUpdateHandler1 = new IndexUpdateHandler( DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.id(), pkStorage)) @@ -524,12 +511,6 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { private BuildIndexCommand createBuildIndexCommand(int indexId, List<UUID> rowUuids, boolean finish) { return msgFactory.buildIndexCommand() - .tablePartitionId( - msgFactory.tablePartitionIdMessage() - .tableId(1) - .partitionId(PARTITION_ID) - .build() - ) .indexId(indexId) .rowIds(rowUuids) .finish(finish) @@ -636,7 +617,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { private void insertAll() { Map<UUID, BinaryRowMessage> rows = new HashMap<>(KEY_COUNT); UUID txId = TestTransactionIds.newTransactionId(); - var commitPartId = new TablePartitionId(1, PARTITION_ID); + var commitPartId = new TablePartitionId(TABLE_ID, PARTITION_ID); for (int i = 0; i < KEY_COUNT; i++) { rows.put(TestTransactionIds.newTransactionId(), getTestRow(i, i)); @@ -672,7 +653,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { */ private void updateAll(Function<Integer, Integer> keyValueMapper) { UUID txId = TestTransactionIds.newTransactionId(); - var commitPartId = new TablePartitionId(1, PARTITION_ID); + var commitPartId = new TablePartitionId(TABLE_ID, PARTITION_ID); Map<UUID, BinaryRowMessage> rows = new HashMap<>(KEY_COUNT); for (int i = 0; i < KEY_COUNT; i++) { @@ -709,7 +690,7 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { */ private void deleteAll() { UUID txId = TestTransactionIds.newTransactionId(); - var commitPartId = new TablePartitionId(1, PARTITION_ID); + var commitPartId = new TablePartitionId(TABLE_ID, PARTITION_ID); Map<UUID, BinaryRowMessage> keyRows = new HashMap<>(KEY_COUNT); for (int i = 0; i < KEY_COUNT; i++) { diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java index 7d7b4215bf..6c2432cb90 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java @@ -201,7 +201,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get())) ); - TestPartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(TEST_MV_PARTITION_STORAGE); + TestPartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(TABLE_ID, PART_ID, TEST_MV_PARTITION_STORAGE); CatalogTables catalogTables = mock(CatalogTables.class); when(catalogTables.table(anyInt(), anyLong())).thenReturn(mock(CatalogTableDescriptor.class)); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 2d4b1f4fe5..f5dda8b3a4 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -275,7 +275,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { private TransactionStateResolver transactionStateResolver; - private final PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(testMvPartitionStorage); + private final PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(TABLE_ID, PART_ID, testMvPartitionStorage); @Mock private RaftGroupService mockRaftClient; diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index ece7c2926a..615dd012f0 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -448,7 +448,7 @@ public class ItTxTestCluster { new PendingComparableValuesTracker<>(clocks.get(assignment).now()); PendingComparableValuesTracker<Long, Void> storageIndexTracker = new PendingComparableValuesTracker<>(0L); - PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(mvPartStorage); + PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(tableId, partId, mvPartStorage); IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler( DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get())) diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java index 6a18c727ec..2b8a78aa63 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java @@ -41,16 +41,37 @@ import org.jetbrains.annotations.Nullable; * Test implementation of {@link PartitionDataStorage}. */ public class TestPartitionDataStorage implements PartitionDataStorage { + private final int tableId; + + private final int partitionId; + private final MvPartitionStorage partitionStorage; private final Lock partitionSnapshotsLock = new ReentrantLock(); private final RaftGroupConfigurationConverter configurationConverter = new RaftGroupConfigurationConverter(); - public TestPartitionDataStorage(MvPartitionStorage partitionStorage) { + /** Constructor. */ + public TestPartitionDataStorage( + int tableId, + int partitionId, + MvPartitionStorage partitionStorage + ) { + this.tableId = tableId; + this.partitionId = partitionId; this.partitionStorage = partitionStorage; } + @Override + public int tableId() { + return tableId; + } + + @Override + public int partitionId() { + return partitionId; + } + @Override public <V> V runConsistently(WriteClosure<V> closure) throws StorageException { return partitionStorage.runConsistently(closure); diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index 349b66dc50..726610e2f2 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -323,7 +323,7 @@ public class DummyInternalTableImpl extends InternalTableImpl { safeTime = new PendingIndependentComparableValuesTracker<>(HybridTimestamp.MIN_VALUE); - PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(mvPartStorage); + PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(tableId, PART_ID, mvPartStorage); TableIndexStoragesSupplier indexes = createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get())); GcConfiguration gcConfig = mock(GcConfiguration.class); @@ -378,7 +378,7 @@ public class DummyInternalTableImpl extends InternalTableImpl { partitionListener = new PartitionListener( this.txManager, - new TestPartitionDataStorage(mvPartStorage), + new TestPartitionDataStorage(tableId, PART_ID, mvPartStorage), storageUpdateHandler, txStateStorage().getOrCreateTxStateStorage(PART_ID), safeTime,