This is an automated email from the ASF dual-hosted git repository. mmuzaf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 2b4e6ee IGNITE-14305 Fix snapshot check command for indexes (#8874) 2b4e6ee is described below commit 2b4e6ee63d475840e160298b04e967e0d71bd577 Author: Maxim Muzafarov <mmu...@apache.org> AuthorDate: Wed Mar 17 00:06:18 2021 +0300 IGNITE-14305 Fix snapshot check command for indexes (#8874) --- .../persistence/file/FilePageStoreManager.java | 2 +- .../persistence/snapshot/SnapshotFutureTask.java | 20 +++- .../persistence/snapshot/SnapshotMetadata.java | 5 +- .../snapshot/SnapshotPartitionsVerifyTask.java | 9 ++ .../snapshot/AbstractSnapshotSelfTest.java | 3 + .../IgniteClusterSnapshotCheckWithIndexesTest.java | 127 +++++++++++++++++++++ .../testsuites/IgnitePdsWithIndexingTestSuite.java | 2 + 7 files changed, 159 insertions(+), 9 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index 7110dcd..a7c682d 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -1043,7 +1043,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen return Arrays.stream(files) .filter(File::isFile) - .filter(f -> f.getName().startsWith(PART_FILE_PREFIX)) + .filter(f -> f.getName().endsWith(FILE_SUFFIX)) .collect(Collectors.toList()); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java index 89484f7..978d146 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java @@ -45,6 +45,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; import java.util.function.BooleanSupplier; +import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.binary.BinaryType; @@ -57,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; @@ -389,19 +389,19 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem int grpId = e.getKey(); Set<Integer> grpParts = e.getValue(); - GridDhtPartitionTopology top = cctx.cache().cacheGroup(grpId).topology(); + CacheGroupContext gctx = cctx.cache().cacheGroup(grpId); Iterator<GridDhtLocalPartition> iter; if (grpParts == null) - iter = top.currentLocalPartitions().iterator(); + iter = gctx.topology().currentLocalPartitions().iterator(); else { if (grpParts.contains(INDEX_PARTITION)) { throw new IgniteCheckedException("Index partition cannot be included into snapshot if " + " set of cache group partitions has been explicitly provided [grpId=" + grpId + ']'); } - iter = F.iterator(grpParts, top::localPartition, false); + iter = F.iterator(grpParts, gctx.topology()::localPartition, false); } Set<Integer> owning = new HashSet<>(); @@ -420,6 +420,8 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem missed.add(part.id()); } + boolean affNode = gctx.nodeFilter() == null || gctx.nodeFilter().apply(cctx.localNode()); + if (grpParts != null) { // Partition has been provided for cache group, but some of them are not in OWNING state. // Exit with an error. @@ -437,7 +439,7 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem "Partitions which have different states skipped. Index partitions has also been skipped " + "[snpName=" + snpName + ", grpId=" + grpId + ", missed=" + missed + ']'); } - else if (missed.isEmpty() && cctx.kernalContext().query().moduleEnabled()) + else if (affNode && missed.isEmpty() && cctx.kernalContext().query().moduleEnabled()) owning.add(INDEX_PARTITION); } @@ -620,7 +622,13 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem if (closeFut == null) { Throwable err0 = err.get(); - closeFut = CompletableFuture.runAsync(() -> onDone(partFileLengths.keySet(), err0), + // Zero partitions haven't to be written on disk. + Set<GroupPartitionId> taken = partFileLengths.entrySet().stream() + .filter(e -> e.getValue() > 0) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + closeFut = CompletableFuture.runAsync(() -> onDone(taken, err0), cctx.kernalContext().getSystemExecutorService()); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java index d2a8918..f5fb6cba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java @@ -137,7 +137,7 @@ public class SnapshotMetadata implements Serializable { } /** - * @return The list of cache groups ids which were included into snapshot. + * @return The list of cache group IDs which were included into the snapshot globally. */ public List<Integer> cacheGroupIds() { return grpIds; @@ -151,7 +151,8 @@ public class SnapshotMetadata implements Serializable { } /** - * @return Map of cache group partitions from which snapshot has been taken on local node. + * @return Map of cache group partitions from which snapshot has been taken on the local node (which is actually + * saved on the local node because some of them may be skipped due to cache node filter). */ public Map<Integer, Set<Integer>> partitions() { return locParts; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java index e717a45..42ef810 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java @@ -57,6 +57,8 @@ import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; +import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName; @@ -64,6 +66,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId; import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId; import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash; +import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum; /** * Task for checking snapshot partitions consistency the same way as {@link VerifyBackupPartitionsTaskV2} does. @@ -233,6 +236,12 @@ public class SnapshotPartitionsVerifyTask val -> { }) ) { + if (partId == INDEX_PARTITION) { + checkPartitionsPageCrcSum(() -> pageStore, INDEX_PARTITION, FLAG_IDX); + + return null; + } + ByteBuffer pageBuff = buff.get(); pageBuff.clear(); pageStore.read(0, pageBuff, true); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java index 809a23d..74976b1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java @@ -104,6 +104,9 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest { /** Number of cache keys to pre-create at node start. */ protected static final int CACHE_KEYS_RANGE = 1024; + /** Timeout in milliseconds to await for snapshot operation being completed. */ + protected static final int SNAPSHOT_AWAIT_TIMEOUT_MS = 15_000; + /** List of collected snapshot test events. */ protected final List<Integer> locEvts = new CopyOnWriteArrayList<>(); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java new file mode 100644 index 0000000..e0aee42e --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.util.UUID; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgnitePredicate; +import org.junit.Test; + +import static java.util.Collections.singletonList; +import static org.apache.ignite.testframework.GridTestUtils.assertContains; + +/** + * Cluster-wide snapshot test check command with indexes. + */ +public class IgniteClusterSnapshotCheckWithIndexesTest extends AbstractSnapshotSelfTest { + /** @throws Exception If fails. */ + @Test + public void testClusterSnapshotCheckEmptyCache() throws Exception { + IgniteEx ignite = startGridsWithCache(3, 0, key -> new Account(key, key), + txFilteredCache("indexed")); + + ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(SNAPSHOT_AWAIT_TIMEOUT_MS); + + IdleVerifyResultV2 res = ignite.context().cache().context().snapshotMgr().checkSnapshot(SNAPSHOT_NAME).get(); + + StringBuilder b = new StringBuilder(); + res.print(b::append, true); + + assertTrue("Exceptions: " + b, F.isEmpty(res.exceptions())); + assertTrue(F.isEmpty(res.exceptions())); + } + + /** @throws Exception If fails. */ + @Test + public void testClusterSnapshotCheckWithIndexes() throws Exception { + IgniteEx ignite = startGridsWithCache(3, CACHE_KEYS_RANGE, key -> new Account(key, key), + txFilteredCache("indexed")); + + ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(SNAPSHOT_AWAIT_TIMEOUT_MS); + + IdleVerifyResultV2 res = ignite.context().cache().context().snapshotMgr().checkSnapshot(SNAPSHOT_NAME).get(); + + StringBuilder b = new StringBuilder(); + res.print(b::append, true); + + assertTrue("Exceptions: " + b, F.isEmpty(res.exceptions())); + assertContains(log, b.toString(), "The check procedure has finished, no conflicts have been found."); + } + + /** @throws Exception If failed. */ + @Test + public void testClusterSnapshotCheckWithNodeFilter() throws Exception { + startGridsWithoutCache(2); + + IgniteCache<Integer, Account> cache1 = grid(0).createCache(txFilteredCache("cache0") + .setNodeFilter(new SelfNodeFilter(grid(0).localNode().id()))); + IgniteCache<Integer, Account> cache2 = grid(1).createCache(txFilteredCache("cache1") + .setNodeFilter(new SelfNodeFilter(grid(1).localNode().id()))); + + for (int i = 0; i < CACHE_KEYS_RANGE; i++) { + cache1.put(i, new Account(i, i)); + cache2.put(i, new Account(i, i)); + } + + grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get(SNAPSHOT_AWAIT_TIMEOUT_MS); + + IdleVerifyResultV2 res = grid(0).context().cache().context().snapshotMgr().checkSnapshot(SNAPSHOT_NAME).get(); + + StringBuilder b = new StringBuilder(); + res.print(b::append, true); + + assertTrue("Exceptions: " + b, F.isEmpty(res.exceptions())); + assertContains(log, b.toString(), "The check procedure has finished, no conflicts have been found."); + } + + /** Node filter to run cache on single node. */ + private static class SelfNodeFilter implements IgnitePredicate<ClusterNode> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Node id to run cache at. */ + private final UUID nodeId; + + /** @param nodeId Node id to run cache at. */ + public SelfNodeFilter(UUID nodeId) { + this.nodeId = nodeId; + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node) { + return node.id().equals(nodeId); + } + } + + /** + * @param cacheName Cache name. + * @return Cache configuration. + */ + private static CacheConfiguration<Integer, Account> txFilteredCache(String cacheName) { + return txCacheConfig(new CacheConfiguration<Integer, Account>(cacheName)) + .setCacheMode(CacheMode.REPLICATED) + .setQueryEntities(singletonList(new QueryEntity(Integer.class.getName(), Account.class.getName()))); + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java index d018457..239d293 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgniteTcBotIni import org.apache.ignite.internal.processors.cache.persistence.db.IndexingMultithreadedLoadContinuousRestartTest; import org.apache.ignite.internal.processors.cache.persistence.db.LongDestroyDurableBackgroundTaskTest; import org.apache.ignite.internal.processors.cache.persistence.db.MultipleParallelCacheDeleteDeadlockTest; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotCheckWithIndexesTest; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotWithIndexesTest; import org.apache.ignite.internal.processors.database.IgniteDbMultiNodeWithIndexingPutGetTest; import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithIndexingPutGetTest; @@ -57,6 +58,7 @@ import org.junit.runners.Suite; RebuildIndexTest.class, RebuildIndexWithMVCCTest.class, IgniteClusterSnapshotWithIndexesTest.class, + IgniteClusterSnapshotCheckWithIndexesTest.class, ClientReconnectWithSqlTableConfiguredTest.class, MultipleParallelCacheDeleteDeadlockTest.class, CacheGroupReencryptionTest.class,