This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 1f3ef87aa4 IGNITE-21549 Add a wait for safeTtime(partition) before starting the index building process (#3242) 1f3ef87aa4 is described below commit 1f3ef87aa4426a9dd378d608f5e0255554e9f132 Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Thu Feb 22 09:06:23 2024 +0300 IGNITE-21549 Add a wait for safeTtime(partition) before starting the index building process (#3242) --- .../internal/table/distributed/TableUtils.java | 28 ++++++++ .../replicator/PartitionReplicaListener.java | 14 ++++ .../replication/PartitionReplicaListenerTest.java | 78 +++++++++++++++++----- 3 files changed, 105 insertions(+), 15 deletions(-) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java index 5b0fe734fb..3f1b8ca61e 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java @@ -17,12 +17,14 @@ package org.apache.ignite.internal.table.distributed; +import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING; import static org.apache.ignite.internal.util.CollectionUtils.view; import java.util.List; import java.util.UUID; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus; import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.tx.TransactionIds; @@ -53,4 +55,30 @@ public class TableUtils { return view(indexes, CatalogObjectDescriptor::id); } + + /** + * Returns the catalog version in which the index got status {@link CatalogIndexStatus#BUILDING}. + * + * @param catalogService Catalog service. + * @param indexId Index ID of interest. + * @param fromCatalogVersionIncluded Catalog version with which the search will begin (inclusive). + */ + public static int findStartBuildingIndexCatalogVersion(CatalogService catalogService, int indexId, int fromCatalogVersionIncluded) { + int latestCatalogVersion = catalogService.latestCatalogVersion(); + + for (int catalogVersion = fromCatalogVersionIncluded; catalogVersion <= latestCatalogVersion; catalogVersion++) { + CatalogIndexDescriptor index = catalogService.index(indexId, catalogVersion); + + assert index != null : "indexId=" + indexId + ", catalogVersion=" + catalogVersion; + + if (index.status() == BUILDING) { + return catalogVersion; + } + } + + throw new AssertionError(String.format( + "Could not find index in status %s: [indexId=%s, fromCatalogVersionIncluded=%s, latestCatalogVersion=%s]", + BUILDING, indexId, fromCatalogVersionIncluded, latestCatalogVersion + )); + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 879c551822..ac157fa270 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -22,8 +22,10 @@ import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.stream.Collectors.toMap; +import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; +import static org.apache.ignite.internal.table.distributed.TableUtils.findStartBuildingIndexCatalogVersion; import static org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.beginRwTxTs; import static org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.latestIndexDescriptorInBuildingStatus; import static org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.rwTxActiveCatalogVersion; @@ -70,6 +72,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.ignite.internal.binarytuple.BinaryTupleCommon; +import org.apache.ignite.internal.catalog.Catalog; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; @@ -3805,10 +3808,21 @@ public class PartitionReplicaListener implements ReplicaListener { private CompletableFuture<?> processBuildIndexReplicaRequest(BuildIndexReplicaRequest request) { return txRwOperationTracker.awaitCompleteTxRwOperations(request.creationCatalogVersion()) + .thenCompose(unused -> safeTime.waitFor(indexStartBuildingActivationTs(request))) .thenCompose(unused -> raftClient.run(toBuildIndexCommand(request))); } private List<Integer> indexIdsAtRwTxBeginTs(UUID txId) { return TableUtils.indexIdsAtRwTxBeginTs(catalogService, txId, tableId()); } + + private HybridTimestamp indexStartBuildingActivationTs(BuildIndexReplicaRequest request) { + int catalogVersion = findStartBuildingIndexCatalogVersion(catalogService, request.indexId(), request.creationCatalogVersion()); + + Catalog catalog = catalogService.catalog(catalogVersion); + + assert catalog != null : "indexId=" + request.indexId() + ", catalogVersion=" + catalogVersion; + + return hybridTimestamp(catalog.time()); + } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 67e40a345a..7858d3ccc1 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -21,7 +21,9 @@ import static java.util.Collections.singletonList; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING; +import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.REGISTERED; import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_BUILDING; +import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong; import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; @@ -85,6 +87,7 @@ import org.apache.ignite.distributed.TestPartitionDataStorage; import org.apache.ignite.distributed.replicator.action.RequestTypes; import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder; import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder; +import org.apache.ignite.internal.catalog.Catalog; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.commands.DefaultValue; import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor; @@ -2802,7 +2805,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { @ParameterizedTest(name = "readOnly = {0}") @ValueSource(booleans = {true, false}) void testStaleTxOperationAfterIndexStartBuilding(boolean readOnly) { - fireHashIndexStartBuildingEventForStaleTxOperation(); + fireHashIndexStartBuildingEventForStaleTxOperation(hashIndexStorage.id(), 1); UUID txId = newTxId(); long beginTs = beginTimestamp(txId).longValue(); @@ -2820,14 +2823,20 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { @Test void testBuildIndexReplicaRequestWithoutRwTxOperations() { - CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync(1, 1); + int indexId = hashIndexStorage.id(); + int indexCreationCatalogVersion = 1; + + CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync( + indexId, + indexCreationCatalogVersion + ); assertFalse(invokeBuildIndexReplicaRequestFuture.isDone()); - fireHashIndexStartBuildingEventForStaleTxOperation(); + fireHashIndexStartBuildingEventForStaleTxOperation(indexId, indexCreationCatalogVersion); assertThat(invokeBuildIndexReplicaRequestFuture, willCompleteSuccessfully()); - assertThat(invokeBuildIndexReplicaRequestAsync(1, 1), willCompleteSuccessfully()); + assertThat(invokeBuildIndexReplicaRequestAsync(indexId, indexCreationCatalogVersion), willCompleteSuccessfully()); } @ParameterizedTest(name = "failCmd = {0}") @@ -2853,9 +2862,16 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { BinaryRow row = binaryRow(0); CompletableFuture<ReplicaResult> upsertFuture = upsertAsync(txId, row, true); - CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync(1, 1); - fireHashIndexStartBuildingEventForStaleTxOperation(); + int indexId = hashIndexStorage.id(); + int indexCreationCatalogVersion = 1; + + CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = invokeBuildIndexReplicaRequestAsync( + indexId, + indexCreationCatalogVersion + ); + + fireHashIndexStartBuildingEventForStaleTxOperation(indexId, indexCreationCatalogVersion); assertFalse(upsertFuture.isDone()); assertFalse(invokeBuildIndexReplicaRequestFuture.isDone()); @@ -2871,22 +2887,54 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } assertThat(invokeBuildIndexReplicaRequestFuture, willCompleteSuccessfully()); + + HybridTimestamp startBuildingIndexActivationTs = hybridTimestamp(catalogService.catalog(indexCreationCatalogVersion + 1).time()); + + verify(safeTimeClock).waitFor(eq(startBuildingIndexActivationTs)); } - private void fireHashIndexStartBuildingEventForStaleTxOperation() { - CatalogHashIndexDescriptor hashIndexDescriptor = mock(CatalogHashIndexDescriptor.class); + private void fireHashIndexStartBuildingEventForStaleTxOperation(int indexId, int creationIndexCatalogVersion) { + var registeredIndexDescriptor = mock(CatalogHashIndexDescriptor.class); + var buildingIndexDescriptor = mock(CatalogHashIndexDescriptor.class); - int indexId = hashIndexStorage.id(); + when(registeredIndexDescriptor.id()).thenReturn(indexId); + when(buildingIndexDescriptor.id()).thenReturn(indexId); - when(hashIndexDescriptor.id()).thenReturn(indexId); - when(hashIndexDescriptor.tableId()).thenReturn(TABLE_ID); - when(hashIndexDescriptor.status()).thenReturn(BUILDING); - when(hashIndexDescriptor.txWaitCatalogVersion()).thenReturn(1); + when(buildingIndexDescriptor.tableId()).thenReturn(TABLE_ID); + when(registeredIndexDescriptor.tableId()).thenReturn(TABLE_ID); - when(catalogService.index(eq(indexId), anyInt())).thenReturn(hashIndexDescriptor); + when(registeredIndexDescriptor.status()).thenReturn(REGISTERED); + when(buildingIndexDescriptor.status()).thenReturn(BUILDING); + + when(buildingIndexDescriptor.txWaitCatalogVersion()).thenReturn(creationIndexCatalogVersion); + + int startBuildingIndexCatalogVersion = creationIndexCatalogVersion + 1; + + when(catalogService.index(eq(indexId), eq(creationIndexCatalogVersion))).thenReturn(registeredIndexDescriptor); + when(catalogService.index(eq(indexId), eq(startBuildingIndexCatalogVersion))).thenReturn(buildingIndexDescriptor); + + when(catalogService.latestCatalogVersion()).thenReturn(startBuildingIndexCatalogVersion); + + var registeredIndexCatalog = mock(Catalog.class); + var buildingIndexCatalog = mock(Catalog.class); + + when(registeredIndexCatalog.version()).thenReturn(creationIndexCatalogVersion); + when(buildingIndexCatalog.version()).thenReturn(startBuildingIndexCatalogVersion); + + long registeredIndexActivationTs = clock.now().addPhysicalTime(-100).longValue(); + long buildingIndexActivationTs = clock.nowLong(); + + when(registeredIndexCatalog.time()).thenReturn(registeredIndexActivationTs); + when(buildingIndexCatalog.time()).thenReturn(buildingIndexActivationTs); + + when(catalogService.catalog(eq(creationIndexCatalogVersion))).thenReturn(registeredIndexCatalog); + when(catalogService.catalog(eq(startBuildingIndexCatalogVersion))).thenReturn(buildingIndexCatalog); assertThat( - catalogServiceEventProducer.fireEvent(INDEX_BUILDING, new StartBuildingIndexEventParameters(0L, 2, indexId)), + catalogServiceEventProducer.fireEvent( + INDEX_BUILDING, + new StartBuildingIndexEventParameters(0L, startBuildingIndexCatalogVersion, indexId) + ), willCompleteSuccessfully() ); }