This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 5a826a0f01 IGNITE-20637 Implement recovery of distributed index building (#2756) 5a826a0f01 is described below commit 5a826a0f01c368e327a666fe05e89cb62adbbb22 Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Tue Oct 31 19:25:02 2023 +0300 IGNITE-20637 Implement recovery of distributed index building (#2756) --- .../index/IndexAvailabilityController.java | 187 +++-------- .../index/IndexAvailabilityControllerRestorer.java | 254 ++++++++++++++ .../internal/index/IndexManagementUtils.java | 266 +++++++++++++++ .../apache/ignite/internal/index/IndexManager.java | 3 + .../IndexAvailabilityControllerRestorerTest.java | 374 +++++++++++++++++++++ .../index/IndexAvailabilityControllerTest.java | 35 +- .../internal/index/IndexBuildControllerTest.java | 46 +-- .../internal/index/IndexManagementUtilsTest.java | 123 +++++++ .../ignite/internal/index/IndexManagerTest.java | 26 +- .../internal/index/TestIndexManagementUtils.java | 126 +++++++ .../index/IndexBuildCompletionListener.java | 2 +- .../table/distributed/index/IndexBuilder.java | 15 +- .../table/distributed/index/IndexBuilderTest.java | 2 +- 13 files changed, 1236 insertions(+), 223 deletions(-) diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java index 8f135af85a..0c790f63f8 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java @@ -17,36 +17,37 @@ package org.apache.ignite.internal.index; -import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; -import static java.util.function.Predicate.not; import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.index.IndexManagementUtils.PARTITION_BUILD_INDEX_KEY_PREFIX; +import static org.apache.ignite.internal.index.IndexManagementUtils.extractIndexIdFromPartitionBuildIndexKey; +import static org.apache.ignite.internal.index.IndexManagementUtils.getPartitionCountFromCatalog; +import static org.apache.ignite.internal.index.IndexManagementUtils.inProgressBuildIndexMetastoreKey; +import static org.apache.ignite.internal.index.IndexManagementUtils.isAnyMetastoreKeyPresentLocally; +import static org.apache.ignite.internal.index.IndexManagementUtils.isMetastoreKeyAbsentLocally; +import static org.apache.ignite.internal.index.IndexManagementUtils.makeIndexAvailableInCatalogWithoutFuture; +import static org.apache.ignite.internal.index.IndexManagementUtils.partitionBuildIndexMetastoreKey; +import static org.apache.ignite.internal.index.IndexManagementUtils.partitionBuildIndexMetastoreKeyPrefix; +import static org.apache.ignite.internal.index.IndexManagementUtils.putBuildIndexMetastoreKeysIfAbsent; +import static org.apache.ignite.internal.index.IndexManagementUtils.removeMetastoreKeyIfPresent; +import static org.apache.ignite.internal.index.IndexManagementUtils.toPartitionBuildIndexMetastoreKeyString; import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists; -import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; import static org.apache.ignite.internal.metastorage.dsl.Operations.noop; -import static org.apache.ignite.internal.metastorage.dsl.Operations.put; import static org.apache.ignite.internal.metastorage.dsl.Operations.remove; -import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; import static org.apache.ignite.internal.util.CollectionUtils.concat; import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; -import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; -import org.apache.ignite.internal.catalog.CatalogCommand; import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.CatalogService; -import org.apache.ignite.internal.catalog.IndexAlreadyAvailableValidationException; -import org.apache.ignite.internal.catalog.IndexNotFoundValidationException; import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand; import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; -import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; -import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters; import org.apache.ignite.internal.catalog.events.DropIndexEventParameters; @@ -64,7 +65,6 @@ import org.apache.ignite.internal.metastorage.dsl.Operation; import org.apache.ignite.internal.metastorage.dsl.Operations; import org.apache.ignite.internal.table.distributed.index.IndexBuildCompletionListener; import org.apache.ignite.internal.table.distributed.index.IndexBuilder; -import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.internal.util.IgniteSpinBusyLock; /** @@ -73,37 +73,38 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock; * * <p>An approximate algorithm for making an index available:</p> * <ul> - * <li>On {@link CatalogEvent#INDEX_CREATE}, keys are created in the metastore: {@code indexBuild.inProgress.<indexId>} and - * {@code indexBuild.partition.<indexId>.<partitionId_0>}...{@code indexBuild.partition.<indexId>.<partitionId_N>}.</li> + * <li>On {@link CatalogEvent#INDEX_CREATE}, + * {@link IndexManagementUtils#putBuildIndexMetastoreKeysIfAbsent(MetaStorageManager, int, int) index building keys} are created in the + * metastore.</li> * <li>Then it is expected that the distributed index building event will be triggered for all partitions via - * {@link IndexBuildCompletionListener} (from {@link IndexBuilder#listen}); as a result of each of these events, the corresponding key - * {@code indexBuild.partition.<indexId>.<partitionId>} will be deleted from metastore.</li> - * <li>When all the {@code indexBuild.partition.<indexId>.<partitionId>} keys in the metastore are deleted, - * {@link MakeIndexAvailableCommand} will be executed for the corresponding index.</li> - * <li>At {@link CatalogEvent#INDEX_AVAILABLE}, key {@code indexBuild.inProgress.<indexId>} in the metastore will be deleted.</li> + * {@link IndexBuildCompletionListener} (from {@link IndexBuilder#listen}); as a result of each of these events, the corresponding + * {@link IndexManagementUtils#partitionBuildIndexMetastoreKey(int, int) partition building index key} will be deleted from + * metastore.</li> + * <li>When <b>all</b> the {@link IndexManagementUtils#partitionBuildIndexMetastoreKey(int, int) partition index building key} in the + * metastore are deleted, {@link MakeIndexAvailableCommand} will be executed for the corresponding index.</li> + * <li>At {@link CatalogEvent#INDEX_AVAILABLE}, + * {@link IndexManagementUtils#inProgressBuildIndexMetastoreKey(int) in progress index building key} in the metastore will be + * deleted.</li> * </ul> * * <p>Notes:</p> * <ul> - * <li>At {@link CatalogEvent#INDEX_DROP}, the keys in the metastore are deleted: {@code indexBuild.inProgress.<indexId>} and - * {@code indexBuild.partition.<indexId>.<partitionId_0>}...{@code indexBuild.partition.<indexId>.<partitionId_N>}.</li> + * <li>At {@link CatalogEvent#INDEX_DROP}, + * {@link IndexManagementUtils#putBuildIndexMetastoreKeysIfAbsent(MetaStorageManager, int, int) index building keys} in the metastore + * are deleted.</li> * <li>Handling of {@link CatalogEvent#INDEX_CREATE}, {@link CatalogEvent#INDEX_DROP}, {@link CatalogEvent#INDEX_AVAILABLE} and watch - * prefix {@link #PARTITION_BUILD_INDEX_KEY_PREFIX} is made by the whole cluster (and only one node makes a write to the metastore) as - * these events are global, but only one node (a primary replica owning a partition) handles + * prefix {@link IndexManagementUtils#PARTITION_BUILD_INDEX_KEY_PREFIX} is made by the whole cluster (and only one node makes a write to + * the metastore) as these events are global, but only one node (a primary replica owning a partition) handles * {@link IndexBuildCompletionListener#onBuildCompletion} (form {@link IndexBuilder#listen}) event.</li> + * <li>Restoring index availability occurs in {@link IndexAvailabilityControllerRestorer}.</li> * </ul> * * @see CatalogIndexDescriptor#available() */ -// TODO: IGNITE-20637 Recovery needs to be implemented -// TODO: IGNITE-20637 Need integration with the IgniteImpl +// TODO: IGNITE-20638 Need integration with the IgniteImpl public class IndexAvailabilityController implements ManuallyCloseable { private static final IgniteLogger LOG = Loggers.forClass(IndexAvailabilityController.class); - private static final String IN_PROGRESS_BUILD_INDEX_KEY_PREFIX = "indexBuild.inProgress."; - - private static final String PARTITION_BUILD_INDEX_KEY_PREFIX = "indexBuild.partition."; - private final CatalogManager catalogManager; private final MetaStorageManager metaStorageManager; @@ -173,18 +174,9 @@ public class IndexAvailabilityController implements ManuallyCloseable { return inBusyLockAsync(busyLock, () -> { int indexId = parameters.indexDescriptor().id(); - int partitions = getPartitionCountFromCatalog(indexId, parameters.catalogVersion()); - - ByteArray inProgressBuildIndexKey = inProgressBuildIndexKey(indexId); + int partitions = getPartitionCountFromCatalog(catalogManager, indexId, parameters.catalogVersion()); - return metaStorageManager.invoke( - notExists(inProgressBuildIndexKey), - concat( - List.of(put(inProgressBuildIndexKey, BYTE_EMPTY_ARRAY)), - putPartitionBuildIndexOperations(indexId, partitions) - ), - List.of(noop()) - ); + return putBuildIndexMetastoreKeysIfAbsent(metaStorageManager, indexId, partitions); }); } @@ -192,15 +184,20 @@ public class IndexAvailabilityController implements ManuallyCloseable { return inBusyLockAsync(busyLock, () -> { int indexId = parameters.indexId(); - int partitions = getPartitionCountFromCatalog(indexId, parameters.catalogVersion() - 1); + int partitions = getPartitionCountFromCatalog(catalogManager, indexId, parameters.catalogVersion() - 1); + + ByteArray inProgressBuildIndexKey = inProgressBuildIndexMetastoreKey(indexId); - ByteArray inProgressBuildIndexKey = inProgressBuildIndexKey(indexId); + List<Operation> removePartitionBuildIndexMetastoreKeyOperations = IntStream.range(0, partitions) + .mapToObj(partitionId -> partitionBuildIndexMetastoreKey(indexId, partitionId)) + .map(Operations::remove) + .collect(toList()); return metaStorageManager.invoke( exists(inProgressBuildIndexKey), concat( List.of(remove(inProgressBuildIndexKey)), - removePartitionBuildIndexOperations(indexId, partitions) + removePartitionBuildIndexMetastoreKeyOperations ), List.of(noop()) ); @@ -209,9 +206,9 @@ public class IndexAvailabilityController implements ManuallyCloseable { private CompletableFuture<?> onIndexAvailable(MakeIndexAvailableEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { - ByteArray inProgressBuildIndexKey = inProgressBuildIndexKey(parameters.indexId()); + ByteArray inProgressBuildIndexMetastoreKey = inProgressBuildIndexMetastoreKey(parameters.indexId()); - return metaStorageManager.invoke(exists(inProgressBuildIndexKey), remove(inProgressBuildIndexKey), noop()); + return removeMetastoreKeyIfPresent(metaStorageManager, inProgressBuildIndexMetastoreKey); }); } @@ -224,39 +221,25 @@ public class IndexAvailabilityController implements ManuallyCloseable { Entry entry = event.entryEvent().newEntry(); - if (!entry.tombstone()) { + if (entry.value() != null) { // In case an index was created when there was only one partition. return completedFuture(null); } - String partitionBuildIndexKey = new String(entry.key(), UTF_8); + String partitionBuildIndexKey = toPartitionBuildIndexMetastoreKeyString(entry.key()); - int indexId = parseIndexIdFromPartitionBuildIndexKey(partitionBuildIndexKey); - - ByteArray inProgressBuildIndexKey = inProgressBuildIndexKey(indexId); + int indexId = extractIndexIdFromPartitionBuildIndexKey(partitionBuildIndexKey); long metastoreRevision = entry.revision(); - if (isRemainingPartitionBuildIndexKeys(indexId, metastoreRevision) - || isMetastoreKeyAbsent(inProgressBuildIndexKey, metastoreRevision)) { + if (isAnyMetastoreKeyPresentLocally(metaStorageManager, partitionBuildIndexMetastoreKeyPrefix(indexId), metastoreRevision) + || isMetastoreKeyAbsentLocally(metaStorageManager, inProgressBuildIndexMetastoreKey(indexId), metastoreRevision)) { return completedFuture(null); } // We will not wait for the command to be executed, since we will then find ourselves in a dead lock since we will not be able // to free the metastore thread. - catalogManager - .execute(buildMakeIndexAvailableCommand(indexId)) - .whenComplete((unused, throwable) -> { - if (throwable != null) { - Throwable unwrapCause = unwrapCause(throwable); - - if (!(unwrapCause instanceof IndexNotFoundValidationException) - && !(unwrapCause instanceof IndexAlreadyAvailableValidationException) - && !(unwrapCause instanceof NodeStoppingException)) { - LOG.error("Error processing the command to make the index available: {}", unwrapCause, indexId); - } - } - }); + makeIndexAvailableInCatalogWithoutFuture(catalogManager, indexId, LOG); return completedFuture(null); }); @@ -264,7 +247,7 @@ public class IndexAvailabilityController implements ManuallyCloseable { private void onIndexBuildCompletionForPartition(int indexId, int partitionId) { inBusyLock(busyLock, () -> { - ByteArray partitionBuildIndexKey = partitionBuildIndexKey(indexId, partitionId); + ByteArray partitionBuildIndexKey = partitionBuildIndexMetastoreKey(indexId, partitionId); // Intentionally not waiting for the operation to complete or returning the future because it is not necessary. metaStorageManager @@ -281,76 +264,4 @@ public class IndexAvailabilityController implements ManuallyCloseable { }); }); } - - private int getPartitionCountFromCatalog(int indexId, int catalogVersion) { - CatalogIndexDescriptor indexDescriptor = getIndexDescriptorStrict(indexId, catalogVersion); - - CatalogTableDescriptor tableDescriptor = catalogManager.table(indexDescriptor.tableId(), catalogVersion); - - assert tableDescriptor != null : "tableId=" + indexDescriptor.tableId() + ", catalogVersion=" + catalogVersion; - - CatalogZoneDescriptor zoneDescriptor = catalogManager.zone(tableDescriptor.zoneId(), catalogVersion); - - assert zoneDescriptor != null : "zoneId=" + tableDescriptor.zoneId() + ", catalogVersion=" + catalogVersion; - - return zoneDescriptor.partitions(); - } - - private CatalogIndexDescriptor getIndexDescriptorStrict(int indexId, int catalogVersion) { - CatalogIndexDescriptor indexDescriptor = catalogManager.index(indexId, catalogVersion); - - assert indexDescriptor != null : "indexId=" + indexId + ", catalogVersion=" + catalogVersion; - - return indexDescriptor; - } - - private boolean isRemainingPartitionBuildIndexKeys(int indexId, long metastoreRevision) { - try (Cursor<Entry> cursor = metaStorageManager.prefixLocally(partitionBuildIndexKeyPrefix(indexId), metastoreRevision)) { - return cursor.stream().anyMatch(not(Entry::tombstone)); - } - } - - private boolean isMetastoreKeyAbsent(ByteArray key, long metastoreRevision) { - return metaStorageManager.getLocally(key, metastoreRevision).value() == null; - } - - private static ByteArray inProgressBuildIndexKey(int indexId) { - return ByteArray.fromString(IN_PROGRESS_BUILD_INDEX_KEY_PREFIX + indexId); - } - - private static ByteArray partitionBuildIndexKeyPrefix(int indexId) { - return ByteArray.fromString(PARTITION_BUILD_INDEX_KEY_PREFIX + indexId); - } - - private static ByteArray partitionBuildIndexKey(int indexId, int partitionId) { - return ByteArray.fromString(PARTITION_BUILD_INDEX_KEY_PREFIX + indexId + '.' + partitionId); - } - - private static Collection<Operation> putPartitionBuildIndexOperations(int indexId, int partitions) { - return IntStream.range(0, partitions) - .mapToObj(partitionId -> partitionBuildIndexKey(indexId, partitionId)) - .map(key -> put(key, BYTE_EMPTY_ARRAY)) - .collect(toList()); - } - - private static Collection<Operation> removePartitionBuildIndexOperations(int indexId, int partitions) { - return IntStream.range(0, partitions) - .mapToObj(partitionId -> partitionBuildIndexKey(indexId, partitionId)) - .map(Operations::remove) - .collect(toList()); - } - - private static int parseIndexIdFromPartitionBuildIndexKey(String key) { - assert key.startsWith(PARTITION_BUILD_INDEX_KEY_PREFIX) : key; - - int indexIdFromIndex = PARTITION_BUILD_INDEX_KEY_PREFIX.length(); - - int indexIdToIndex = key.indexOf('.', indexIdFromIndex); - - return Integer.parseInt(key.substring(indexIdFromIndex, indexIdToIndex)); - } - - private static CatalogCommand buildMakeIndexAvailableCommand(int indexId) { - return MakeIndexAvailableCommand.builder().indexId(indexId).build(); - } } diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorer.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorer.java new file mode 100644 index 0000000000..429369fe04 --- /dev/null +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorer.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.index; + +import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.index.IndexManagementUtils.getPartitionCountFromCatalog; +import static org.apache.ignite.internal.index.IndexManagementUtils.inProgressBuildIndexMetastoreKey; +import static org.apache.ignite.internal.index.IndexManagementUtils.isAnyMetastoreKeyPresentLocally; +import static org.apache.ignite.internal.index.IndexManagementUtils.isMetastoreKeyAbsentLocally; +import static org.apache.ignite.internal.index.IndexManagementUtils.isPrimaryReplica; +import static org.apache.ignite.internal.index.IndexManagementUtils.makeIndexAvailableInCatalogWithoutFuture; +import static org.apache.ignite.internal.index.IndexManagementUtils.partitionBuildIndexMetastoreKey; +import static org.apache.ignite.internal.index.IndexManagementUtils.partitionBuildIndexMetastoreKeyPrefix; +import static org.apache.ignite.internal.index.IndexManagementUtils.putBuildIndexMetastoreKeysIfAbsent; +import static org.apache.ignite.internal.index.IndexManagementUtils.removeMetastoreKeyIfPresent; +import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand; +import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; +import org.apache.ignite.internal.close.ManuallyCloseable; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.lang.ByteArray; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.storage.index.IndexStorage; +import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterService; + +/** + * Component responsible for restoring the algorithm from {@link IndexAvailabilityController} if a node fails at some step. + * + * <p>Approximate recovery algorithm:</p> + * <ul> + * <li>For registered indexes: <ul> + * <li>If the new index did not have time to add + * {@link IndexManagementUtils#putBuildIndexMetastoreKeysIfAbsent(MetaStorageManager, int, int) index building keys}, then add them + * to the metastore if they are <b>absent</b>.</li> + * <li>If there are no {@link IndexManagementUtils#partitionBuildIndexMetastoreKey(int, int) partition index building keys} left for + * the index in the metastore, then we {@link MakeIndexAvailableCommand make the index available} in the catalog.</li> + * <li>For partitions for which index building has not completed, we will wait until the primary replica is elected (which will make + * sure it has applied all the commands from the replication log). If after this we find out that the index has been built, we will + * remove the {@link IndexManagementUtils#partitionBuildIndexMetastoreKey(int, int) partition index building key} from the metastore + * if it is <b>present</b>.</li> + * </ul></li> + * <li>For available indexes: <ul> + * <li>Delete the {@link IndexManagementUtils#inProgressBuildIndexMetastoreKey(int) “index construction from progress” key} in the + * metastore if it is <b>present</b>.</li> + * </ul></li> + * </ul> + */ +public class IndexAvailabilityControllerRestorer implements ManuallyCloseable { + private static final IgniteLogger LOG = Loggers.forClass(IndexAvailabilityControllerRestorer.class); + + private final CatalogManager catalogManager; + + private final MetaStorageManager metaStorageManager; + + private final IndexManager indexManager; + + private final PlacementDriver placementDriver; + + private final ClusterService clusterService; + + private final HybridClock clock; + + private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); + + private final AtomicBoolean closeGuard = new AtomicBoolean(); + + /** Constructor. */ + public IndexAvailabilityControllerRestorer( + CatalogManager catalogManager, + MetaStorageManager metaStorageManager, + IndexManager indexManager, + PlacementDriver placementDriver, + ClusterService clusterService, + HybridClock clock + ) { + this.catalogManager = catalogManager; + this.metaStorageManager = metaStorageManager; + this.indexManager = indexManager; + this.placementDriver = placementDriver; + this.clusterService = clusterService; + this.clock = clock; + } + + /** + * Recovers index availability. + * + * <p>NOTE: Should only be executed on node recovery.</p> + * + * @param recoveryRevision Metastore revision on recovery. + * @return Future of recovery execution. + */ + public CompletableFuture<Void> recover(long recoveryRevision) { + return inBusyLockAsync(busyLock, () -> { + // It is expected that the method will only be called on recovery, when the deploy of metastore watches has not yet occurred. + int catalogVersion = catalogManager.latestCatalogVersion(); + + List<CompletableFuture<?>> futures = catalogManager.indexes(catalogVersion).stream() + .map(indexDescriptor -> { + if (indexDescriptor.available()) { + return recoveryForAvailableIndexBusy(indexDescriptor, recoveryRevision); + } else { + return recoveryForRegisteredIndexBusy(indexDescriptor, recoveryRevision, catalogVersion); + } + }) + .collect(toList()); + + return allOf(futures.toArray(CompletableFuture[]::new)); + }); + } + + @Override + public void close() throws Exception { + if (!closeGuard.compareAndSet(false, true)) { + return; + } + + busyLock.block(); + } + + private CompletableFuture<?> recoveryForAvailableIndexBusy(CatalogIndexDescriptor indexDescriptor, long recoveryRevision) { + assert indexDescriptor.available() : indexDescriptor.id(); + + int indexId = indexDescriptor.id(); + + ByteArray inProgressBuildIndexMetastoreKey = inProgressBuildIndexMetastoreKey(indexId); + + if (isMetastoreKeyAbsentLocally(metaStorageManager, inProgressBuildIndexMetastoreKey, recoveryRevision)) { + return completedFuture(null); + } + + return removeMetastoreKeyIfPresent(metaStorageManager, inProgressBuildIndexMetastoreKey); + } + + private CompletableFuture<?> recoveryForRegisteredIndexBusy( + CatalogIndexDescriptor indexDescriptor, + long recoveryRevision, + int catalogVersion + ) { + assert !indexDescriptor.available() : indexDescriptor.id(); + + int indexId = indexDescriptor.id(); + + if (isMetastoreKeyAbsentLocally(metaStorageManager, inProgressBuildIndexMetastoreKey(indexId), recoveryRevision)) { + // After creating the index, we did not have time to create the keys for building the index in the metastore. + return putBuildIndexMetastoreKeysIfAbsent( + metaStorageManager, + indexId, + getPartitionCountFromCatalog(catalogManager, indexId, catalogVersion) + ); + } + + if (!isAnyMetastoreKeyPresentLocally(metaStorageManager, partitionBuildIndexMetastoreKeyPrefix(indexId), recoveryRevision)) { + // Without wait, since the metastore watches deployment will be only after the start of the components is completed and this + // will cause a dead lock. + makeIndexAvailableInCatalogWithoutFuture(catalogManager, indexId, LOG); + + return completedFuture(null); + } + + return recoveryForRemainingPartitionsOfRegisteredIndexBusy(indexDescriptor, recoveryRevision); + } + + private CompletableFuture<?> recoveryForRemainingPartitionsOfRegisteredIndexBusy( + CatalogIndexDescriptor indexDescriptor, + long recoveryRevision + ) { + assert !indexDescriptor.available() : indexDescriptor.id(); + + int indexId = indexDescriptor.id(); + + try (Cursor<Entry> cursor = metaStorageManager.prefixLocally(partitionBuildIndexMetastoreKeyPrefix(indexId), recoveryRevision)) { + CompletableFuture<?>[] futures = cursor.stream() + .filter(entry -> entry.value() != null) + .map(Entry::key) + .map(IndexManagementUtils::toPartitionBuildIndexMetastoreKeyString) + .mapToInt(IndexManagementUtils::extractPartitionIdFromPartitionBuildIndexKey) + .mapToObj(partitionId -> recoveryForPartitionOfRegisteredIndexBusy(indexDescriptor, partitionId, recoveryRevision)) + .toArray(CompletableFuture[]::new); + + return allOf(futures); + } + } + + private CompletableFuture<?> recoveryForPartitionOfRegisteredIndexBusy( + CatalogIndexDescriptor indexDescriptor, + int partitionId, + long recoveryRevision + ) { + int indexId = indexDescriptor.id(); + int tableId = indexDescriptor.tableId(); + + return indexManager.getMvTableStorage(recoveryRevision, tableId) + .thenCompose(mvTableStorage -> inBusyLockAsync(busyLock, () -> { + var replicationGroupId = new TablePartitionId(tableId, partitionId); + + return placementDriver.getPrimaryReplica(replicationGroupId, clock.now()) + .thenCompose(primaryReplicaMeta -> inBusyLockAsync(busyLock, () -> { + ClusterNode localNode = clusterService.topologyService().localMember(); + + if (primaryReplicaMeta == null || !isPrimaryReplica(primaryReplicaMeta, localNode, clock.now())) { + // Local node is not the primary replica, so we expect a primary replica to be elected (which will make + // sure it has applied all the commands from the replication log). If a local node is elected, then + // IndexAvailabilityController will get rid of the partitionBuildIndexMetastoreKey from the metastore on + // its own by IndexBuildCompletionListener.onBuildCompletion event. + return completedFuture(null); + } + + IndexStorage indexStorage = mvTableStorage.getIndex(partitionId, indexId); + + assert indexStorage != null : "indexId=" + indexId + ", partitionId=" + partitionId; + + if (indexStorage.getNextRowIdToBuild() != null) { + // Building of the index has not yet been completed, so we have nothing to do yet. + return completedFuture(null); + } + + return removeMetastoreKeyIfPresent( + metaStorageManager, + partitionBuildIndexMetastoreKey(indexId, partitionId) + ); + })); + })); + } +} diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManagementUtils.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManagementUtils.java new file mode 100644 index 0000000000..7d0af76413 --- /dev/null +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManagementUtils.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.index; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists; +import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; +import static org.apache.ignite.internal.metastorage.dsl.Operations.noop; +import static org.apache.ignite.internal.metastorage.dsl.Operations.put; +import static org.apache.ignite.internal.metastorage.dsl.Operations.remove; +import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; +import static org.apache.ignite.internal.util.CollectionUtils.concat; +import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.catalog.IndexAlreadyAvailableValidationException; +import org.apache.ignite.internal.catalog.IndexNotFoundValidationException; +import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand; +import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lang.ByteArray; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.dsl.Operation; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.network.ClusterNode; + +/** Helper class for index management. */ +class IndexManagementUtils { + /** Metastore key prefix for the "index in the process of building" in the format: {@code "indexBuild.inProgress.<indexId>"}. */ + static final String IN_PROGRESS_BUILD_INDEX_KEY_PREFIX = "indexBuild.inProgress."; + + /** + * Metastore key prefix for the "index in the process of building for partition" in the format: + * {@code "indexBuild.partition.<indexId>.<partitionId>"}. + */ + static final String PARTITION_BUILD_INDEX_KEY_PREFIX = "indexBuild.partition."; + + /** + * Returns {@code true} if the {@code key} is <b>absent</b> in the metastore locally. + * + * @param metastore Metastore manager. + * @param key Key to check. + * @param revUpperBound Upper bound of metastore revision. + */ + static boolean isMetastoreKeyAbsentLocally(MetaStorageManager metastore, ByteArray key, long revUpperBound) { + return metastore.getLocally(key, revUpperBound).value() == null; + } + + /** + * Returns {@code true} if at least one key by prefix is <b>present</b> in the metastore locally. + * + * @param metastore Metastore manager. + * @param keyPrefix Key prefix to check. + * @param revUpperBound Upper bound of metastore revision. + */ + static boolean isAnyMetastoreKeyPresentLocally(MetaStorageManager metastore, ByteArray keyPrefix, long revUpperBound) { + try (Cursor<Entry> cursor = metastore.prefixLocally(keyPrefix, revUpperBound)) { + return cursor.stream().map(Entry::value).anyMatch(Objects::nonNull); + } + } + + /** + * Removes a {@code key} from the metastore if <b>present</b>. + * + * @param metaStorageManager Metastore manager. + * @param key Key to remove. + * @return Future result {@code true} if actual removal happened, otherwise {@code false}. + */ + static CompletableFuture<Boolean> removeMetastoreKeyIfPresent(MetaStorageManager metaStorageManager, ByteArray key) { + return metaStorageManager.invoke(exists(key), remove(key), noop()); + } + + /** + * Puts index building keys into the metastore if they are absent. + * + * <p>NOTES: Presence of keys is determined by {@value #IN_PROGRESS_BUILD_INDEX_KEY_PREFIX} + {@code "<indexId>"}.</p> + * + * <p>Keys: </p> + * <ul> + * <li>{@value #IN_PROGRESS_BUILD_INDEX_KEY_PREFIX} + {@code "<indexId>"}.</li> + * <li>{@value #PARTITION_BUILD_INDEX_KEY_PREFIX} + {@code "<indexId>.0"} ... + * {@value #PARTITION_BUILD_INDEX_KEY_PREFIX} + {@code "<indexId>.<partitions-1>"}.</li> + * </ul> + * + * @param metastore Metastore manager. + * @param indexId Index ID. + * @param partitions Partition count. + * @return Future result {@code true} if success update was applied, otherwise {@code false}. + */ + static CompletableFuture<Boolean> putBuildIndexMetastoreKeysIfAbsent(MetaStorageManager metastore, int indexId, int partitions) { + ByteArray inProgressBuildIndexMetastoreKey = inProgressBuildIndexMetastoreKey(indexId); + + List<Operation> putPartitionBuildIndexMetastoreKeyOperations = IntStream.range(0, partitions) + .mapToObj(partitionId -> put(partitionBuildIndexMetastoreKey(indexId, partitionId), BYTE_EMPTY_ARRAY)) + .collect(toList()); + + return metastore.invoke( + notExists(inProgressBuildIndexMetastoreKey), + concat( + List.of(put(inProgressBuildIndexMetastoreKey, BYTE_EMPTY_ARRAY)), + putPartitionBuildIndexMetastoreKeyOperations + ), + List.of(noop()) + ); + } + + /** + * Returns the "index in the process of building" metastore key, format: + * {@value #IN_PROGRESS_BUILD_INDEX_KEY_PREFIX} + {@code "<indexId>"}. + * + * @param indexId Index ID. + */ + static ByteArray inProgressBuildIndexMetastoreKey(int indexId) { + return ByteArray.fromString(IN_PROGRESS_BUILD_INDEX_KEY_PREFIX + indexId); + } + + /** + * Returns the "building an index for the partition" metastore prefix key, format: + * {@value #PARTITION_BUILD_INDEX_KEY_PREFIX} + {@code "<indexId>"}. + * + * @param indexId Index ID. + */ + static ByteArray partitionBuildIndexMetastoreKeyPrefix(int indexId) { + return ByteArray.fromString(PARTITION_BUILD_INDEX_KEY_PREFIX + indexId); + } + + /** + * Returns the "building an index for the partition" metastore key, format: + * {@value #PARTITION_BUILD_INDEX_KEY_PREFIX} + {@code "<indexId>.<partitionId>"}. + * + * @param indexId Index ID. + * @param partitionId Partition ID. + */ + static ByteArray partitionBuildIndexMetastoreKey(int indexId, int partitionId) { + return ByteArray.fromString(PARTITION_BUILD_INDEX_KEY_PREFIX + indexId + '.' + partitionId); + } + + /** + * Converts bytes to string key: {@value PARTITION_BUILD_INDEX_KEY_PREFIX} + {@code "<indexId>.<partitionId>"}. + * + * @param bytes Bytes to convert. + */ + static String toPartitionBuildIndexMetastoreKeyString(byte[] bytes) { + String keyStr = new String(bytes, UTF_8); + + assert keyStr.startsWith(PARTITION_BUILD_INDEX_KEY_PREFIX) : keyStr; + + return keyStr; + } + + /** + * Returns partition count from the catalog. + * + * @param catalogService Catalog service. + * @param indexId Index ID. + * @param catalogVersion Catalog version. + */ + static int getPartitionCountFromCatalog(CatalogService catalogService, int indexId, int catalogVersion) { + CatalogIndexDescriptor indexDescriptor = catalogService.index(indexId, catalogVersion); + + assert indexDescriptor != null : "indexId=" + indexId + ", catalogVersion=" + catalogVersion; + + CatalogTableDescriptor tableDescriptor = catalogService.table(indexDescriptor.tableId(), catalogVersion); + + assert tableDescriptor != null : "tableId=" + indexDescriptor.tableId() + ", catalogVersion=" + catalogVersion; + + CatalogZoneDescriptor zoneDescriptor = catalogService.zone(tableDescriptor.zoneId(), catalogVersion); + + assert zoneDescriptor != null : "zoneId=" + tableDescriptor.zoneId() + ", catalogVersion=" + catalogVersion; + + return zoneDescriptor.partitions(); + } + + /** + * Makes the index available in the catalog, does not return the future execution of the operation, so as not to create dead locks when + * performing the operation and the inability to complete it due to execution in the metastore thread or on recovery (the metastore + * watches will not be deployed yet). Logs errors if it is not {@link IndexNotFoundValidationException}, + * {@link IndexAlreadyAvailableValidationException} or {@link NodeStoppingException}. + * + * @param catalogManager Catalog manger. + * @param indexId Index ID. + * @param log Logger. + */ + static void makeIndexAvailableInCatalogWithoutFuture(CatalogManager catalogManager, int indexId, IgniteLogger log) { + catalogManager + .execute(MakeIndexAvailableCommand.builder().indexId(indexId).build()) + .whenComplete((unused, throwable) -> { + if (throwable != null) { + Throwable unwrapCause = unwrapCause(throwable); + + if (!(unwrapCause instanceof IndexNotFoundValidationException) + && !(unwrapCause instanceof IndexAlreadyAvailableValidationException) + && !(unwrapCause instanceof NodeStoppingException)) { + log.error("Error processing the command to make the index available: {}", unwrapCause, indexId); + } + } + }); + } + + /** + * Extracts a partition ID from the key: {@value PARTITION_BUILD_INDEX_KEY_PREFIX} + {@code "<indexId>.<partitionId>"}. + * + * @param key Key. + * @return Partition ID. + */ + static int extractPartitionIdFromPartitionBuildIndexKey(String key) { + assert key.startsWith(PARTITION_BUILD_INDEX_KEY_PREFIX) : key; + + String[] strings = key.split("\\."); + + return Integer.parseInt(strings[3]); + } + + /** + * Extracts a index ID from the key: {@value PARTITION_BUILD_INDEX_KEY_PREFIX} + {@code "<indexId>.<partitionId>"}. + * + * @param key Key. + * @return Index ID. + */ + static int extractIndexIdFromPartitionBuildIndexKey(String key) { + assert key.startsWith(PARTITION_BUILD_INDEX_KEY_PREFIX) : key; + + String[] strings = key.split("\\."); + + return Integer.parseInt(strings[2]); + } + + /** + * Returns {@code true} if the local node is the primary replica at the timestamp of interest. + * + * @param primaryReplicaMeta Primary replica meta. + * @param localNode Local node. + * @param timestamp Timestamp of interest. + */ + static boolean isPrimaryReplica(ReplicaMeta primaryReplicaMeta, ClusterNode localNode, HybridTimestamp timestamp) { + return localNode.id().equals(primaryReplicaMeta.getLeaseholderId()) + && timestamp.compareTo(primaryReplicaMeta.getExpirationTime()) < 0; + } +} diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java index 6d61eeb2f1..d54da07b72 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java @@ -167,6 +167,9 @@ public class IndexManager implements IgniteComponent { * <p>Example: when we start building an index, we will need {@link IndexStorage} (as well as storage {@link MvPartitionStorage}) to * build it and we can get them in {@link CatalogEvent#INDEX_CREATE} using this method.</p> * + * <p>During recovery, it is important to wait until the local node becomes a primary replica so that all index building commands are + * applied from the replication log.</p> + * * @param causalityToken Causality token. * @param tableId Table ID. * @return Future with multi-version table storage, completes with {@code null} if the table does not exist according to the passed diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java new file mode 100644 index 0000000000..b103055c58 --- /dev/null +++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.index; + +import static java.util.Collections.emptyIterator; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.index.IndexManagementUtils.inProgressBuildIndexMetastoreKey; +import static org.apache.ignite.internal.index.IndexManagementUtils.partitionBuildIndexMetastoreKey; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.INDEX_NAME; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.LOCAL_NODE; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_ID; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NAME; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.assertMetastoreKeyAbsent; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.assertMetastoreKeyPresent; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.awaitTillGlobalMetastoreRevisionIsApplied; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.createIndex; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.createTable; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.indexDescriptor; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.indexId; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.makeIndexAvailable; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.newPrimaryReplicaMeta; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.tableId; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; +import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.CatalogTestUtils; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; +import org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager; +import org.apache.ignite.internal.metastorage.server.KeyValueStorage; +import org.apache.ignite.internal.metastorage.server.TestRocksDbKeyValueStorage; +import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.engine.MvTableStorage; +import org.apache.ignite.internal.storage.index.IndexStorage; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.testframework.WorkDirectory; +import org.apache.ignite.internal.testframework.WorkDirectoryExtension; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.vault.VaultManager; +import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterNodeImpl; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.TopologyService; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +/** For {@link IndexAvailabilityControllerRestorer} testing. */ +@ExtendWith(WorkDirectoryExtension.class) +public class IndexAvailabilityControllerRestorerTest extends BaseIgniteAbstractTest { + private static final int PARTITION_ID = 0; + + @WorkDirectory + private Path workDir; + + private final HybridClock clock = new HybridClockImpl(); + + private final PlacementDriver placementDriver = mock(PlacementDriver.class); + + private final ClusterService clusterService = mock(ClusterService.class); + + private final IndexManager indexManager = mock(IndexManager.class); + + private final VaultManager vaultManager = new VaultManager(new InMemoryVaultService()); + + private KeyValueStorage keyValueStorage; + + private MetaStorageManagerImpl metaStorageManager; + + private CatalogManager catalogManager; + + private IndexAvailabilityControllerRestorer restorer; + + @BeforeEach + void setUp() throws Exception { + keyValueStorage = new TestRocksDbKeyValueStorage(NODE_NAME, workDir); + + metaStorageManager = StandaloneMetaStorageManager.create(vaultManager, keyValueStorage); + + catalogManager = CatalogTestUtils.createTestCatalogManager(NODE_NAME, clock, metaStorageManager); + + Stream.of(vaultManager, metaStorageManager, catalogManager).forEach(IgniteComponent::start); + + deployWatches(); + + createTable(catalogManager, TABLE_NAME, COLUMN_NAME); + } + + @AfterEach + void tearDown() throws Exception { + IgniteUtils.closeAll( + restorer == null ? null : restorer::close, + catalogManager == null ? null : catalogManager::stop, + metaStorageManager == null ? null : metaStorageManager::stop, + vaultManager::stop + ); + } + + @Test + void testRemoveInProgressBuildIndexMetastoreKeyForAvailableIndexes() throws Exception { + createIndex(catalogManager, TABLE_NAME, INDEX_NAME + 0, COLUMN_NAME); + createIndex(catalogManager, TABLE_NAME, INDEX_NAME + 1, COLUMN_NAME); + + int indexId0 = indexId(catalogManager, INDEX_NAME + 0, clock); + int indexId1 = indexId(catalogManager, INDEX_NAME + 1, clock); + + makeIndexAvailable(catalogManager, indexId0); + makeIndexAvailable(catalogManager, indexId1); + + // Let's put the inProgressBuildIndexMetastoreKey for only one index in the metastore. + putInProgressBuildIndexMetastoreKeyInMetastore(indexId0); + + restartComponentsAndPerformRecovery(); + + // Let's do checks. + assertMetastoreKeyAbsent(metaStorageManager, inProgressBuildIndexMetastoreKey(indexId0)); + assertMetastoreKeyAbsent(metaStorageManager, inProgressBuildIndexMetastoreKey(indexId1)); + + assertTrue(indexDescriptor(catalogManager, INDEX_NAME + 0, clock).available()); + assertTrue(indexDescriptor(catalogManager, INDEX_NAME + 1, clock).available()); + } + + @Test + void testMakeIndexAvailableIfNoLeftKeysBuildingIndexForPartitionInMetastore() throws Exception { + createIndex(catalogManager, TABLE_NAME, INDEX_NAME, COLUMN_NAME); + + int indexId = indexId(catalogManager, INDEX_NAME, clock); + + putInProgressBuildIndexMetastoreKeyInMetastore(indexId); + + restartComponentsAndPerformRecovery(); + + // Let's do checks. + assertMetastoreKeyPresent(metaStorageManager, inProgressBuildIndexMetastoreKey(indexId)); + assertTrue(indexDescriptor(catalogManager, INDEX_NAME, clock).available()); + } + + @Test + void testRemovePartitionBuildIndexMetastoreKeyForRegisteredIndex() throws Exception { + createIndexWithIndexBuildingKeys(INDEX_NAME, PARTITION_ID); + + int indexId = indexId(catalogManager, INDEX_NAME, clock); + TablePartitionId replicaGroupId = new TablePartitionId(tableId(catalogManager, TABLE_NAME, clock), PARTITION_ID); + ReplicaMeta primaryReplicaMeta = createPrimaryReplicaMetaThatExpireInOneDay(LOCAL_NODE, replicaGroupId); + + prepareToRestartNode(replicaGroupId, indexId, primaryReplicaMeta); + + restartComponentsAndPerformRecovery(); + + // Let's do checks. + assertMetastoreKeyPresent(metaStorageManager, inProgressBuildIndexMetastoreKey(indexId)); + assertMetastoreKeyAbsent(metaStorageManager, partitionBuildIndexMetastoreKey(indexId, PARTITION_ID)); + + assertFalse(indexDescriptor(catalogManager, INDEX_NAME, clock).available()); + } + + @Test + void testNotRemovePartitionBuildIndexMetastoreKeyForRegisteredIndexIfBuildingIndexNotComplete() throws Exception { + createIndexWithIndexBuildingKeys(INDEX_NAME, PARTITION_ID); + + int indexId = indexId(catalogManager, INDEX_NAME, clock); + TablePartitionId replicaGroupId = new TablePartitionId(tableId(catalogManager, TABLE_NAME, clock), PARTITION_ID); + ReplicaMeta primaryReplicaMeta = createPrimaryReplicaMetaThatExpireInOneDay(LOCAL_NODE, replicaGroupId); + + prepareToRestartNode(replicaGroupId, indexId, primaryReplicaMeta, new RowId(PARTITION_ID)); + + restartComponentsAndPerformRecovery(); + + // Let's do checks. + assertMetastoreKeyPresent(metaStorageManager, inProgressBuildIndexMetastoreKey(indexId)); + assertMetastoreKeyPresent(metaStorageManager, partitionBuildIndexMetastoreKey(indexId, PARTITION_ID)); + + assertFalse(indexDescriptor(catalogManager, INDEX_NAME, clock).available()); + } + + @Test + void testNotRemovePartitionBuildIndexMetastoreKeyForRegisteredIndexIfPrimaryReplicaMetaNull() throws Exception { + createIndexWithIndexBuildingKeys(INDEX_NAME, PARTITION_ID); + + int indexId = indexId(catalogManager, INDEX_NAME, clock); + TablePartitionId replicaGroupId = new TablePartitionId(tableId(catalogManager, TABLE_NAME, clock), PARTITION_ID); + + prepareToRestartNode(replicaGroupId, indexId, null); + + restartComponentsAndPerformRecovery(); + + // Let's do checks. + assertMetastoreKeyPresent(metaStorageManager, inProgressBuildIndexMetastoreKey(indexId)); + assertMetastoreKeyPresent(metaStorageManager, partitionBuildIndexMetastoreKey(indexId, PARTITION_ID)); + + assertFalse(indexDescriptor(catalogManager, INDEX_NAME, clock).available()); + } + + @Test + void testNotRemovePartitionBuildIndexMetastoreKeyForRegisteredIndexIfPrimaryReplicaMetaChanges() throws Exception { + createIndexWithIndexBuildingKeys(INDEX_NAME, PARTITION_ID); + + int indexId = indexId(catalogManager, INDEX_NAME, clock); + TablePartitionId replicaGroupId = new TablePartitionId(tableId(catalogManager, TABLE_NAME, clock), PARTITION_ID); + + ReplicaMeta primaryReplicaMeta = createPrimaryReplicaMetaThatExpireInOneDay( + new ClusterNodeImpl(NODE_ID + "_ID_OLD", NODE_NAME + "_OLD", mock(NetworkAddress.class)), + replicaGroupId + ); + + prepareToRestartNode(replicaGroupId, indexId, primaryReplicaMeta); + + restartComponentsAndPerformRecovery(); + + // Let's do checks. + assertMetastoreKeyPresent(metaStorageManager, inProgressBuildIndexMetastoreKey(indexId)); + assertMetastoreKeyPresent(metaStorageManager, partitionBuildIndexMetastoreKey(indexId, PARTITION_ID)); + + assertFalse(indexDescriptor(catalogManager, INDEX_NAME, clock).available()); + } + + private void createIndexWithIndexBuildingKeys(String indexName, int partitionId) { + createIndex(catalogManager, TABLE_NAME, indexName, COLUMN_NAME); + + int indexId = indexId(catalogManager, indexName, clock); + + putInProgressBuildIndexMetastoreKeyInMetastore(indexId); + putPartitionBuildIndexMetastoreKeyInMetastore(indexId, partitionId); + } + + private ReplicaMeta createPrimaryReplicaMetaThatExpireInOneDay(ClusterNode clusterNode, TablePartitionId replicaGroupId) { + HybridTimestamp startTime = clock.now(); + HybridTimestamp expirationTime = startTime.addPhysicalTime(TimeUnit.DAYS.toMillis(1)); + + return newPrimaryReplicaMeta(clusterNode, replicaGroupId, startTime, expirationTime); + } + + private void prepareToRestartNode( + TablePartitionId replicaGroupId, + int indexId, + @Nullable ReplicaMeta primaryReplicaMeta, + RowId... rowIdsToBuild + ) { + setIndexStorageToIndexManager(replicaGroupId, indexId, rowIdsToBuild); + setLocalNodeToClusterService(LOCAL_NODE); + setPrimaryReplicaMetaToPlacementDriver(replicaGroupId, primaryReplicaMeta); + } + + private void putInProgressBuildIndexMetastoreKeyInMetastore(int indexId) { + assertThat(metaStorageManager.put(inProgressBuildIndexMetastoreKey(indexId), BYTE_EMPTY_ARRAY), willCompleteSuccessfully()); + } + + private void putPartitionBuildIndexMetastoreKeyInMetastore(int indexId, int partitionId) { + assertThat( + metaStorageManager.put(partitionBuildIndexMetastoreKey(indexId, partitionId), BYTE_EMPTY_ARRAY), + willCompleteSuccessfully() + ); + } + + private void restartComponentsAndPerformRecovery() throws Exception { + stopAndRestartComponentsNoDeployWatches(); + + assertThat(recoveryRestorer(), willCompleteSuccessfully()); + + deployWatches(); + } + + private void stopAndRestartComponentsNoDeployWatches() throws Exception { + awaitTillGlobalMetastoreRevisionIsApplied(metaStorageManager); + + IgniteUtils.closeAll( + catalogManager == null ? null : catalogManager::stop, + metaStorageManager == null ? null : metaStorageManager::stop + ); + + keyValueStorage = new TestRocksDbKeyValueStorage(NODE_NAME, workDir); + + metaStorageManager = StandaloneMetaStorageManager.create(vaultManager, keyValueStorage); + + catalogManager = spy(CatalogTestUtils.createTestCatalogManager(NODE_NAME, clock, metaStorageManager)); + + Stream.of(metaStorageManager, catalogManager).forEach(IgniteComponent::start); + } + + private void deployWatches() throws Exception { + assertThat(metaStorageManager.deployWatches(), willCompleteSuccessfully()); + + awaitTillGlobalMetastoreRevisionIsApplied(metaStorageManager); + } + + private CompletableFuture<Void> recoveryRestorer() throws Exception { + if (restorer != null) { + restorer.close(); + } + + restorer = new IndexAvailabilityControllerRestorer( + catalogManager, + metaStorageManager, + indexManager, + placementDriver, + clusterService, + clock + ); + + CompletableFuture<Long> metastoreRecoveryFuture = metaStorageManager.recoveryFinishedFuture(); + + assertThat(metastoreRecoveryFuture, willBe(greaterThan(0L))); + + return restorer.recover(metastoreRecoveryFuture.join()); + } + + private void setIndexStorageToIndexManager(TablePartitionId replicaGroupId, int indexId, RowId... rowIdsToBuild) { + MvTableStorage mvTableStorage = mock(MvTableStorage.class); + IndexStorage indexStorage = mock(IndexStorage.class); + + Iterator<RowId> it = nullOrEmpty(rowIdsToBuild) ? emptyIterator() : List.of(rowIdsToBuild).iterator(); + + when(indexStorage.getNextRowIdToBuild()).then(invocation -> it.hasNext() ? it.next() : null); + + when(mvTableStorage.getIndex(replicaGroupId.partitionId(), indexId)).thenReturn(indexStorage); + when(indexManager.getMvTableStorage(anyLong(), eq(replicaGroupId.tableId()))).thenReturn(completedFuture(mvTableStorage)); + } + + private void setLocalNodeToClusterService(ClusterNode clusterNode) { + TopologyService topologyService = mock(TopologyService.class, invocation -> clusterNode); + + when(clusterService.topologyService()).thenReturn(topologyService); + } + + private void setPrimaryReplicaMetaToPlacementDriver(TablePartitionId replicaGroupId, @Nullable ReplicaMeta primaryReplicaMeta) { + when(placementDriver.getPrimaryReplica(eq(replicaGroupId), any())).thenReturn(completedFuture(primaryReplicaMeta)); + } +} diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java index 8dcd6b7bc9..281b714cf4 100644 --- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java +++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java @@ -20,11 +20,14 @@ package org.apache.ignite.internal.index; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME; -import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.INDEX_NAME; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NAME; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.createTable; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; -import static org.apache.ignite.sql.ColumnType.INT32; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.nullValue; @@ -40,7 +43,6 @@ import java.util.stream.Stream; import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.CatalogTestUtils; import org.apache.ignite.internal.catalog.commands.AlterZoneParams; -import org.apache.ignite.internal.catalog.commands.ColumnParams; import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.hlc.HybridClock; @@ -67,14 +69,6 @@ import org.junit.jupiter.api.Test; /** For {@link IndexAvailabilityController} testing. */ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest { - private static final String NODE_NAME = "test-node"; - - private static final String TABLE_NAME = "test-table"; - - private static final String COLUMN_NAME = "test-column"; - - private static final String INDEX_NAME = "test-index"; - private static final long ANY_ENLISTMENT_CONSISTENCY_TOKEN = 100500; private final HybridClock clock = new HybridClockImpl(); @@ -113,14 +107,7 @@ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest { assertThat(partitions, greaterThan(4)); - TableTestUtils.createTable( - catalogManager, - DEFAULT_SCHEMA_NAME, - DEFAULT_ZONE_NAME, - TABLE_NAME, - List.of(ColumnParams.builder().name(COLUMN_NAME).type(INT32).build()), - List.of(COLUMN_NAME) - ); + createTable(catalogManager, TABLE_NAME, COLUMN_NAME); } @AfterEach @@ -287,15 +274,7 @@ public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest { } private void awaitTillGlobalMetastoreRevisionIsApplied() throws Exception { - assertTrue( - waitForCondition(() -> { - CompletableFuture<Long> currentRevisionFuture = metaStorageManager.getService().currentRevision(); - - assertThat(currentRevisionFuture, willCompleteSuccessfully()); - - return currentRevisionFuture.join() == metaStorageManager.appliedRevision(); - }, 1_000) - ); + TestIndexManagementUtils.awaitTillGlobalMetastoreRevisionIsApplied(metaStorageManager); } private void createIndex(String indexName) { diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java index c3ce885949..759bdc3dca 100644 --- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java +++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java @@ -19,13 +19,18 @@ package org.apache.ignite.internal.index; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME; -import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.INDEX_NAME; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.LOCAL_NODE; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_ID; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NAME; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.createTable; import static org.apache.ignite.internal.table.TableTestUtils.createHashIndex; import static org.apache.ignite.internal.table.TableTestUtils.getIndexIdStrict; import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; -import static org.apache.ignite.sql.ColumnType.INT32; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -44,7 +49,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.CatalogTestUtils; -import org.apache.ignite.internal.catalog.commands.ColumnParams; import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand; import org.apache.ignite.internal.event.AbstractEventProducer; import org.apache.ignite.internal.hlc.HybridClock; @@ -64,10 +68,7 @@ import org.apache.ignite.internal.table.TableTestUtils; import org.apache.ignite.internal.table.distributed.index.IndexBuilder; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.util.IgniteUtils; -import org.apache.ignite.network.ClusterNode; -import org.apache.ignite.network.ClusterNodeImpl; import org.apache.ignite.network.ClusterService; -import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.TopologyService; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -75,20 +76,8 @@ import org.junit.jupiter.api.Test; /** For {@link IndexBuildController} testing. */ public class IndexBuildControllerTest extends BaseIgniteAbstractTest { - private static final String NODE_NAME = "test_node"; - - private static final String NODE_ID = "test_node_id"; - - private static final String TABLE_NAME = "test_table"; - - private static final String COLUMN_NAME = "test_column"; - - private static final String INDEX_NAME = "test_index"; - private static final int PARTITION_ID = 10; - private final ClusterNode localNode = new ClusterNodeImpl(NODE_ID, NODE_NAME, mock(NetworkAddress.class)); - private IndexBuilder indexBuilder; private CatalogManager catalogManager; @@ -114,19 +103,12 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest { return completedFuture(mvTableStorage); }); - ClusterService clusterService = mock(ClusterService.class, invocation -> mock(TopologyService.class, invocation1 -> localNode)); + ClusterService clusterService = mock(ClusterService.class, invocation -> mock(TopologyService.class, invocation1 -> LOCAL_NODE)); catalogManager = CatalogTestUtils.createTestCatalogManager(NODE_NAME, clock); catalogManager.start(); - TableTestUtils.createTable( - catalogManager, - DEFAULT_SCHEMA_NAME, - DEFAULT_ZONE_NAME, - TABLE_NAME, - List.of(ColumnParams.builder().name(COLUMN_NAME).type(INT32).build()), - List.of(COLUMN_NAME) - ); + createTable(catalogManager, TABLE_NAME, COLUMN_NAME); indexBuildController = new IndexBuildController(indexBuilder, indexManager, catalogManager, clusterService, placementDriver, clock); } @@ -150,7 +132,7 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest { eq(indexId(INDEX_NAME)), any(), any(), - eq(localNode), + eq(LOCAL_NODE), anyLong() ); } @@ -167,7 +149,7 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest { eq(indexId(INDEX_NAME)), any(), any(), - eq(localNode), + eq(LOCAL_NODE), anyLong() ); @@ -177,7 +159,7 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest { eq(indexId(pkIndexName(TABLE_NAME))), any(), any(), - eq(localNode), + eq(LOCAL_NODE), anyLong() ); } @@ -217,7 +199,7 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest { eq(indexId(INDEX_NAME)), any(), any(), - eq(localNode), + eq(LOCAL_NODE), anyLong() ); @@ -227,7 +209,7 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest { eq(indexId(pkIndexName(TABLE_NAME))), any(), any(), - eq(localNode), + eq(LOCAL_NODE), anyLong() ); } diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagementUtilsTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagementUtilsTest.java new file mode 100644 index 0000000000..58a73a325c --- /dev/null +++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagementUtilsTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.index; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.ignite.internal.index.IndexManagementUtils.extractIndexIdFromPartitionBuildIndexKey; +import static org.apache.ignite.internal.index.IndexManagementUtils.extractPartitionIdFromPartitionBuildIndexKey; +import static org.apache.ignite.internal.index.IndexManagementUtils.inProgressBuildIndexMetastoreKey; +import static org.apache.ignite.internal.index.IndexManagementUtils.isPrimaryReplica; +import static org.apache.ignite.internal.index.IndexManagementUtils.partitionBuildIndexMetastoreKey; +import static org.apache.ignite.internal.index.IndexManagementUtils.partitionBuildIndexMetastoreKeyPrefix; +import static org.apache.ignite.internal.index.IndexManagementUtils.toPartitionBuildIndexMetastoreKeyString; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.LOCAL_NODE; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_ID; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.newPrimaryReplicaMeta; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lang.ByteArray; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterNodeImpl; +import org.apache.ignite.network.NetworkAddress; +import org.junit.jupiter.api.Test; + +/** For {@link IndexManagementUtils} testing. */ +public class IndexManagementUtilsTest extends BaseIgniteAbstractTest { + private final HybridClock clock = new HybridClockImpl(); + + @Test + void testPartitionBuildIndexMetastoreKey() { + assertEquals(ByteArray.fromString("indexBuild.partition.1.2"), partitionBuildIndexMetastoreKey(1, 2)); + assertEquals(ByteArray.fromString("indexBuild.partition.7.9"), partitionBuildIndexMetastoreKey(7, 9)); + } + + @Test + void testInProgressBuildIndexMetastoreKey() { + assertEquals(ByteArray.fromString("indexBuild.inProgress.1"), inProgressBuildIndexMetastoreKey(1)); + assertEquals(ByteArray.fromString("indexBuild.inProgress.7"), inProgressBuildIndexMetastoreKey(7)); + } + + @Test + void testPartitionBuildIndexMetastoreKeyPrefix() { + assertEquals(ByteArray.fromString("indexBuild.partition.1"), partitionBuildIndexMetastoreKeyPrefix(1)); + assertEquals(ByteArray.fromString("indexBuild.partition.7"), partitionBuildIndexMetastoreKeyPrefix(7)); + } + + @Test + void tesToPartitionBuildIndexMetastoreKeyString() { + assertEquals("indexBuild.partition.1.2", toPartitionBuildIndexMetastoreKeyString("indexBuild.partition.1.2".getBytes(UTF_8))); + assertEquals("indexBuild.partition.7.9", toPartitionBuildIndexMetastoreKeyString("indexBuild.partition.7.9".getBytes(UTF_8))); + } + + @Test + void testExtractPartitionIdFromPartitionBuildIndexKey() { + assertEquals(2, extractPartitionIdFromPartitionBuildIndexKey("indexBuild.partition.1.2")); + assertEquals(9, extractPartitionIdFromPartitionBuildIndexKey("indexBuild.partition.7.9")); + } + + @Test + void testExtractIndexIdFromPartitionBuildIndexKey() { + assertEquals(1, extractIndexIdFromPartitionBuildIndexKey("indexBuild.partition.1.2")); + assertEquals(7, extractIndexIdFromPartitionBuildIndexKey("indexBuild.partition.7.9")); + } + + @Test + void testIsPrimaryReplicaTrue() { + TablePartitionId replicaGroupId = new TablePartitionId(1, 0); + + HybridTimestamp startTime = clock.now(); + long dayInMillis = TimeUnit.DAYS.toMillis(1); + + ReplicaMeta replicaMeta = newPrimaryReplicaMeta(LOCAL_NODE, replicaGroupId, startTime, startTime.addPhysicalTime(dayInMillis)); + + assertTrue(isPrimaryReplica(replicaMeta, LOCAL_NODE, clock.now())); + } + + @Test + void testIsPrimaryReplicaFalse() { + TablePartitionId replicaGroupId = new TablePartitionId(1, 0); + + ClusterNode otherNode = new ClusterNodeImpl(NODE_ID + "-other", NODE_NAME + "-other", mock(NetworkAddress.class)); + + HybridTimestamp now = clock.now(); + long dayInMillis = TimeUnit.DAYS.toMillis(1); + long hourInMillis = TimeUnit.HOURS.toMillis(1); + + HybridTimestamp startTime0 = now; + HybridTimestamp startTime1 = now.addPhysicalTime(-dayInMillis); + + ReplicaMeta replicaMeta0 = newPrimaryReplicaMeta(otherNode, replicaGroupId, startTime0, startTime0.addPhysicalTime(dayInMillis)); + ReplicaMeta replicaMeta1 = newPrimaryReplicaMeta(LOCAL_NODE, replicaGroupId, startTime1, startTime1.addPhysicalTime(hourInMillis)); + ReplicaMeta replicaMeta2 = newPrimaryReplicaMeta(LOCAL_NODE, replicaGroupId, now, now); + + assertFalse(isPrimaryReplica(replicaMeta0, LOCAL_NODE, clock.now())); + assertFalse(isPrimaryReplica(replicaMeta1, LOCAL_NODE, clock.now())); + assertFalse(isPrimaryReplica(replicaMeta2, LOCAL_NODE, now)); + } +} diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java index 9c5c440446..0b5e9834ea 100644 --- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java +++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java @@ -21,13 +21,15 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_DATA_REGION; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NAME; +import static org.apache.ignite.internal.index.TestIndexManagementUtils.createTable; import static org.apache.ignite.internal.table.TableTestUtils.createHashIndex; -import static org.apache.ignite.internal.table.TableTestUtils.createTable; import static org.apache.ignite.internal.table.TableTestUtils.dropTable; import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; -import static org.apache.ignite.sql.ColumnType.STRING; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -44,7 +46,6 @@ import java.util.function.LongFunction; import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.CatalogManagerImpl; import org.apache.ignite.internal.catalog.ClockWaiter; -import org.apache.ignite.internal.catalog.commands.ColumnParams; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.storage.UpdateLogImpl; @@ -75,10 +76,6 @@ import org.junit.jupiter.api.Test; * Test class to verify {@link IndexManager}. */ public class IndexManagerTest extends BaseIgniteAbstractTest { - private static final String TABLE_NAME = "tName"; - - private static final String COLUMN_NAME = "c"; - private final HybridClock clock = new HybridClockImpl(); private VaultManager vaultManager; @@ -105,13 +102,11 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { when(schManager.schemaRegistry(anyLong(), anyInt())).thenReturn(completedFuture(null)); - String nodeName = "test"; - vaultManager = new VaultManager(new InMemoryVaultService()); - metaStorageManager = StandaloneMetaStorageManager.create(vaultManager, new SimpleInMemoryKeyValueStorage(nodeName)); + metaStorageManager = StandaloneMetaStorageManager.create(vaultManager, new SimpleInMemoryKeyValueStorage(NODE_NAME)); - clockWaiter = new ClockWaiter(nodeName, clock); + clockWaiter = new ClockWaiter(NODE_NAME, clock); catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metaStorageManager), clockWaiter); @@ -129,14 +124,7 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { assertThat(metaStorageManager.notifyRevisionUpdateListenerOnStart(), willCompleteSuccessfully()); assertThat(metaStorageManager.deployWatches(), willCompleteSuccessfully()); - createTable( - catalogManager, - DEFAULT_SCHEMA_NAME, - DEFAULT_ZONE_NAME, - TABLE_NAME, - List.of(ColumnParams.builder().name(COLUMN_NAME).length(100).type(STRING).build()), - List.of(COLUMN_NAME) - ); + createTable(catalogManager, TABLE_NAME, COLUMN_NAME); } @AfterEach diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java b/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java new file mode 100644 index 0000000000..8b1339f288 --- /dev/null +++ b/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.index; + +import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME; +import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.sql.ColumnType.INT32; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.catalog.commands.ColumnParams; +import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand; +import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lang.ByteArray; +import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.placementdriver.leases.Lease; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterNodeImpl; +import org.apache.ignite.network.NetworkAddress; + +/** Helper class for testing index management. */ +class TestIndexManagementUtils { + static final String NODE_NAME = "test-node"; + + static final String NODE_ID = "test-node-id"; + + static final String TABLE_NAME = "test-table"; + + static final String COLUMN_NAME = "test-column"; + + static final String INDEX_NAME = "test-index"; + + static final ClusterNode LOCAL_NODE = new ClusterNodeImpl(NODE_ID, NODE_NAME, mock(NetworkAddress.class)); + + static void createTable(CatalogManager catalogManager, String tableName, String columnName) { + TableTestUtils.createTable( + catalogManager, + DEFAULT_SCHEMA_NAME, + DEFAULT_ZONE_NAME, + tableName, + List.of(ColumnParams.builder().name(columnName).type(INT32).build()), + List.of(columnName) + ); + } + + static void createIndex(CatalogManager catalogManager, String tableName, String indexName, String columnName) { + TableTestUtils.createHashIndex(catalogManager, DEFAULT_SCHEMA_NAME, tableName, indexName, List.of(columnName), false); + } + + static int indexId(CatalogService catalogService, String indexName, HybridClock clock) { + return TableTestUtils.getIndexIdStrict(catalogService, indexName, clock.nowLong()); + } + + static CatalogIndexDescriptor indexDescriptor(CatalogService catalogService, String indexId, HybridClock clock) { + return TableTestUtils.getIndexStrict(catalogService, indexId, clock.nowLong()); + } + + static int tableId(CatalogService catalogService, String tableName, HybridClock clock) { + return TableTestUtils.getTableIdStrict(catalogService, tableName, clock.nowLong()); + } + + static void makeIndexAvailable(CatalogManager catalogManager, int indexId) { + assertThat(catalogManager.execute(MakeIndexAvailableCommand.builder().indexId(indexId).build()), willCompleteSuccessfully()); + } + + static void awaitTillGlobalMetastoreRevisionIsApplied(MetaStorageManagerImpl metaStorageManager) throws Exception { + assertTrue( + waitForCondition(() -> { + CompletableFuture<Long> currentRevisionFuture = metaStorageManager.getService().currentRevision(); + + assertThat(currentRevisionFuture, willCompleteSuccessfully()); + + return currentRevisionFuture.join() == metaStorageManager.appliedRevision(); + }, 1_000) + ); + } + + static void assertMetastoreKeyAbsent(MetaStorageManager metaStorageManager, ByteArray key) { + assertThat(metaStorageManager.get(key).thenApply(Entry::value), willBe(nullValue())); + } + + static void assertMetastoreKeyPresent(MetaStorageManager metaStorageManager, ByteArray key) { + assertThat(metaStorageManager.get(key).thenApply(Entry::value), willBe(notNullValue())); + } + + static ReplicaMeta newPrimaryReplicaMeta( + ClusterNode clusterNode, + TablePartitionId replicaGroupId, + HybridTimestamp startTime, + HybridTimestamp expirationTime + ) { + return new Lease(clusterNode.name(), clusterNode.id(), startTime, expirationTime, replicaGroupId); + } +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildCompletionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildCompletionListener.java index e2c98a3df5..8044e4bee2 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildCompletionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildCompletionListener.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.table.distributed.index; -/** Index build completion listener, , will be called when a distributed build of an index for a specific partition completes. */ +/** Index build completion listener, will be called when a distributed build of an index for a specific partition completes. */ @FunctionalInterface public interface IndexBuildCompletionListener { /** Handles the index build completion event. */ diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java index 2f9e19574f..b7c9409d69 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java @@ -102,10 +102,13 @@ public class IndexBuilder implements ManuallyCloseable { /** * Schedules building the index if it is not already built or is not yet in progress. * - * <p>Index is built in batches using {@link BuildIndexReplicaRequest}, which are then transformed into {@link BuildIndexCommand} on the - * replica, batches are sent sequentially.</p> - * - * <p>It is expected that the index building is triggered by the primary replica.</p> + * <p>Notes:</p> + * <ul> + * <li>Index is built in batches using {@link BuildIndexReplicaRequest}, which are then transformed into {@link BuildIndexCommand} + * on the replica, batches are sent sequentially.</li> + * <li>It is expected that the index building is triggered by the primary replica.</li> + * <li>If the index has already been built, {@link IndexBuildCompletionListener} will be notified.</li> + * </ul> * * @param tableId Table ID. * @param partitionId Partition ID. @@ -128,6 +131,10 @@ public class IndexBuilder implements ManuallyCloseable { ) { inBusyLockSafe(busyLock, () -> { if (indexStorage.getNextRowIdToBuild() == null) { + for (IndexBuildCompletionListener listener : listeners) { + listener.onBuildCompletion(indexId, tableId, partitionId); + } + return; } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexBuilderTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexBuilderTest.java index b106aa1c68..c1ffc35255 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexBuilderTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexBuilderTest.java @@ -113,7 +113,7 @@ public class IndexBuilderTest extends BaseIgniteAbstractTest { scheduleBuildIndex(INDEX_ID, TABLE_ID, PARTITION_ID, List.of()); - assertThat(listenCompletionIndexBuildingFuture, willTimeoutFast()); + assertThat(listenCompletionIndexBuildingFuture, willCompleteSuccessfully()); } private void scheduleBuildIndex(int indexId, int tableId, int partitionId, Collection<RowId> nextRowIdsToBuild) {