This is an automated email from the ASF dual-hosted git repository. nizhikov pushed a commit to branch IGNITE-17177_inc_snapshots in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/IGNITE-17177_inc_snapshots by this push: new f13743d8be6 IGNITE-17645 Write lastSnpSeg into SnapshotMetadata (#10247) f13743d8be6 is described below commit f13743d8be60e0023c04430ff2bf6cafbe3dd06a Author: Maksim Timonin <timonin.ma...@gmail.com> AuthorDate: Fri Sep 23 16:42:57 2022 +0300 IGNITE-17645 Write lastSnpSeg into SnapshotMetadata (#10247) --- .../systemview/walker/SnapshotViewWalker.java | 4 +- .../snapshot/IgniteSnapshotManager.java | 7 +- .../persistence/snapshot/SnapshotFutureTask.java | 12 ++- .../snapshot/SnapshotFutureTaskResult.java | 52 +++++++++++++ .../persistence/snapshot/SnapshotMetadata.java | 15 ++++ .../ignite/spi/systemview/view/SnapshotView.java | 35 ++++++--- .../ignite/internal/metric/SystemViewSelfTest.java | 1 + .../IgniteClusterSnapshotWalRecordTest.java | 91 +++++++++++++++++++--- 8 files changed, 188 insertions(+), 29 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SnapshotViewWalker.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SnapshotViewWalker.java index bee5fd35b21..9a513f10699 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SnapshotViewWalker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SnapshotViewWalker.java @@ -33,6 +33,7 @@ public class SnapshotViewWalker implements SystemViewRowAttributeWalker<Snapshot v.accept(1, "consistentId", String.class); v.accept(2, "baselineNodes", String.class); v.accept(3, "cacheGroups", String.class); + v.accept(4, "snapshotRecordSegment", Long.class); } /** {@inheritDoc} */ @@ -41,10 +42,11 @@ public class SnapshotViewWalker implements SystemViewRowAttributeWalker<Snapshot v.accept(1, "consistentId", String.class, row.consistentId()); v.accept(2, "baselineNodes", String.class, row.baselineNodes()); v.accept(3, "cacheGroups", String.class, row.cacheGroups()); + v.accept(4, "snapshotRecordSegment", Long.class, row.snapshotRecordSegment()); } /** {@inheritDoc} */ @Override public int count() { - return 4; + return 5; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index e19e5c38a75..e27d6161d53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -822,6 +822,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter snpDir.mkdirs(); + SnapshotFutureTaskResult res = (SnapshotFutureTaskResult)fut.result(); + SnapshotMetadata meta = new SnapshotMetadata(req.requestId(), req.snapshotName(), cctx.localNode().consistentId().toString(), @@ -829,7 +831,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter cctx.gridConfig().getDataStorageConfiguration().getPageSize(), grpIds, blts, - (Set<GroupPartitionId>)fut.result(), + res.parts(), + res.snapshotPointer(), cctx.gridConfig().getEncryptionSpi().masterKeyDigest() ); @@ -2183,7 +2186,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter List<File> dirs = snapshotCacheDirectories(meta.snapshotName(), null, meta.folderName(), name -> true); Collection<String> cacheGrps = F.viewReadOnly(dirs, FilePageStoreManager::cacheGroupName); - return new SnapshotView(meta.snapshotName(), meta.consistentId(), F.concat(meta.baselineNodes(), ","), F.concat(cacheGrps, ",")); + return new SnapshotView(meta, cacheGrps); } /** @return Snapshot handlers. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java index 579da37ee8e..aa5326aa9af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java @@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc; import org.apache.ignite.internal.processors.marshaller.MappedName; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl; @@ -94,7 +95,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.snapshot.I * If partitions for particular cache group are not provided that they will be collected and added * on checkpoint under the write-lock. */ -class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId>> implements CheckpointListener { +class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskResult> implements CheckpointListener { /** File page store manager for accessing cache group associated files. */ private final FilePageStoreManager pageStore; @@ -140,6 +141,9 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId /** Future which will be completed when task requested to be closed. Will be executed on system pool. */ private volatile CompletableFuture<Void> closeFut; + /** Pointer to {@link ClusterSnapshotRecord}. */ + private volatile @Nullable WALPointer snpPtr; + /** Flag indicates that task already scheduled on checkpoint. */ private final AtomicBoolean started = new AtomicBoolean(); @@ -209,7 +213,7 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId } /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable Set<GroupPartitionId> res, @Nullable Throwable err) { + @Override public boolean onDone(@Nullable SnapshotFutureTaskResult res, @Nullable Throwable err) { for (PageStoreSerialWriter writer : partDeltaWriters.values()) U.closeQuiet(writer); @@ -349,7 +353,7 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId // 1. Checkpoint holds write acquire lock and Snapshot holds PME. Then there are not any concurrent updates. // 2. This record is written before the related CheckpointRecord, and is flushed with CheckpointRecord or instead it. if (cctx.wal() != null) { - cctx.wal().log(new ClusterSnapshotRecord(snpName)); + snpPtr = cctx.wal().log(new ClusterSnapshotRecord(snpName)); ctx.walFlush(true); } @@ -614,7 +618,7 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId .map(Map.Entry::getKey) .collect(Collectors.toSet()); - closeFut = CompletableFuture.runAsync(() -> onDone(taken, err0), + closeFut = CompletableFuture.runAsync(() -> onDone(new SnapshotFutureTaskResult(taken, snpPtr), err0), cctx.kernalContext().pools().getSystemExecutorService()); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTaskResult.java new file mode 100644 index 00000000000..0f2acac059e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTaskResult.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.util.Collections; +import java.util.Set; +import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord; +import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; +import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; +import org.jetbrains.annotations.Nullable; + +/** + * Represents result of {@link SnapshotFutureTask}. + */ +class SnapshotFutureTaskResult { + /** Partitions for which snapshot was created. */ + private final Set<GroupPartitionId> parts; + + /** Pointer to {@link ClusterSnapshotRecord} in WAL. */ + private final @Nullable WALPointer snpPtr; + + /** */ + SnapshotFutureTaskResult(Set<GroupPartitionId> parts, @Nullable WALPointer snpPtr) { + this.parts = Collections.unmodifiableSet(parts); + this.snpPtr = snpPtr; + } + + /** */ + Set<GroupPartitionId> parts() { + return parts; + } + + /** */ + @Nullable WALPointer snapshotPointer() { + return snpPtr; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java index 009c0b8849b..e1f8e4e2e6c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java @@ -29,7 +29,9 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; +import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord; import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; +import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -68,6 +70,9 @@ public class SnapshotMetadata implements Serializable { @GridToStringInclude private final Set<String> bltNodes; + /** WAL pointer to {@link ClusterSnapshotRecord} if exists. */ + private final @Nullable WALPointer snpRecPtr; + /** * Map of cache group partitions from which snapshot has been taken on the local node. This map can be empty * since for instance, due to the node filter there is no cache data on node. @@ -86,6 +91,7 @@ public class SnapshotMetadata implements Serializable { * @param pageSize Page size of stored snapshot data. * @param grpIds The list of cache groups ids which were included into snapshot. * @param bltNodes The set of affected by snapshot baseline nodes. + * @param snpRecPtr WAL pointer to {@link ClusterSnapshotRecord} if exists. * @param masterKeyDigest Master key digest for encrypted caches. */ public SnapshotMetadata( @@ -97,6 +103,7 @@ public class SnapshotMetadata implements Serializable { List<Integer> grpIds, Set<String> bltNodes, Set<GroupPartitionId> pairs, + @Nullable WALPointer snpRecPtr, @Nullable byte[] masterKeyDigest ) { this.rqId = rqId; @@ -106,6 +113,7 @@ public class SnapshotMetadata implements Serializable { this.pageSize = pageSize; this.grpIds = grpIds; this.bltNodes = bltNodes; + this.snpRecPtr = snpRecPtr; this.masterKeyDigest = masterKeyDigest; pairs.forEach(p -> @@ -170,6 +178,13 @@ public class SnapshotMetadata implements Serializable { return Collections.unmodifiableMap(locParts); } + /** + * @return WAL pointer to {@link ClusterSnapshotRecord} if exists. + */ + public @Nullable WALPointer snapshotRecordPointer() { + return snpRecPtr; + } + /** Save the state of this <tt>HashMap</tt> partitions and cache groups to a stream. */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SnapshotView.java b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SnapshotView.java index 3b17ca28bdd..6519a76a796 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SnapshotView.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SnapshotView.java @@ -17,7 +17,11 @@ package org.apache.ignite.spi.systemview.view; +import java.util.Collection; import org.apache.ignite.internal.managers.systemview.walker.Order; +import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata; +import org.apache.ignite.internal.util.typedef.F; /** * Snapshot representation for a {@link SystemView}. @@ -41,22 +45,23 @@ public class SnapshotView { /** Cache group names that were included in the snapshot. */ private final String cacheGrps; + /** WAL segment that contains {@link ClusterSnapshotRecord} if exists. */ + private final Long snpRecSeg; + /** - * @param name Snapshot name. - * @param consistentId Node consistent ID. - * @param baselineNodes Baseline nodes affected by the snapshot. + * @param meta Snapshot metadata. * @param cacheGrps Cache group names that were included in the snapshot. */ public SnapshotView( - String name, - String consistentId, - String baselineNodes, - String cacheGrps + SnapshotMetadata meta, + Collection<String> cacheGrps ) { - this.name = name; - this.consistentId = consistentId; - this.baselineNodes = baselineNodes; - this.cacheGrps = cacheGrps; + name = meta.snapshotName(); + consistentId = meta.consistentId(); + baselineNodes = F.concat(meta.baselineNodes(), ","); + snpRecSeg = meta.snapshotRecordPointer() == null ? null : meta.snapshotRecordPointer().index(); + + this.cacheGrps = F.concat(cacheGrps, ","); } /** @@ -90,4 +95,12 @@ public class SnapshotView { public String cacheGroups() { return cacheGrps; } + + /** + * @return WAL segment that contains {@link ClusterSnapshotRecord} if exists. + */ + @Order(4) + public Long snapshotRecordSegment() { + return snpRecSeg; + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java index d391435979f..1db5a941956 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java @@ -2101,6 +2101,7 @@ public class SystemViewSelfTest extends GridCommonAbstractTest { assertEquals(testSnap0, view.name()); assertEquals(ignite.localNode().consistentId().toString(), view.consistentId()); + assertNotNull(view.snapshotRecordSegment()); Collection<?> constIds = F.nodeConsistentIds(ignite.cluster().nodes()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWalRecordTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWalRecordTest.java index cbd70523f13..6db4f0ad006 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWalRecordTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWalRecordTest.java @@ -21,9 +21,12 @@ import java.io.File; import java.nio.file.Path; import java.nio.file.Paths; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.configuration.CacheConfiguration; @@ -42,9 +45,13 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.spi.systemview.view.SnapshotView; +import org.apache.ignite.spi.systemview.view.SystemView; import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; +import static org.apache.ignite.spi.systemview.view.SnapshotView.SNAPSHOT_SYS_VIEW; + /** */ public class IgniteClusterSnapshotWalRecordTest extends AbstractSnapshotSelfTest { /** @@ -58,23 +65,33 @@ public class IgniteClusterSnapshotWalRecordTest extends AbstractSnapshotSelfTest CountDownLatch loadStopLatch = new CountDownLatch(1); // Start changing data concurrently with performing the ClusterSnapshot operation. - IgniteInternalFuture<?> loadFut = GridTestUtils.runMultiThreadedAsync(() -> { - Random r = new Random(); + IgniteInternalFuture<?> loadFut = null; - while (loadStopLatch.getCount() > 0) { - int key = r.nextInt(CACHE_KEYS_RANGE); + try { + loadFut = GridTestUtils.runMultiThreadedAsync(() -> { + Random r = new Random(); - Account acc = new Account(r.nextInt(), r.nextInt()); + while (loadStopLatch.getCount() > 0 && !Thread.interrupted()) { + int key = r.nextInt(CACHE_KEYS_RANGE); - ign.cache(DEFAULT_CACHE_NAME).put(key, acc); - } - }, 5, "cache-loader-"); + Account acc = new Account(r.nextInt(), r.nextInt()); - snp(ign).createSnapshot(SNAPSHOT_NAME).get(); + ign.cache(DEFAULT_CACHE_NAME).put(key, acc); + } + }, 5, "cache-loader-"); - loadStopLatch.countDown(); + snp(ign).createSnapshot(SNAPSHOT_NAME).get(); - loadFut.get(); + loadStopLatch.countDown(); + + loadFut.get(); + } + catch (Throwable err) { + if (loadFut != null) + loadFut.cancel(); + + throw err; + } T2<Map<Integer, Account>, Map<Integer, Account>> data = parseWalCacheState(ign, SNAPSHOT_NAME); @@ -93,6 +110,58 @@ public class IgniteClusterSnapshotWalRecordTest extends AbstractSnapshotSelfTest assertCacheKeys(snpIgn.cache(DEFAULT_CACHE_NAME), snpData); } + /** */ + @Test + public void testClusterSnapshotRecordIsWrittenToSnapshotMetadata() throws Exception { + int nodes = 3; + int snapshots = 10; + + startGridsWithCache(nodes, 1, key -> new Account(key, key), + new CacheConfiguration<>(DEFAULT_CACHE_NAME) + ); + + for (int i = 0; i < snapshots; i++) { + // Start changing data concurrently with performing the ClusterSnapshot operation. + snp(grid(0)).createSnapshot(SNAPSHOT_NAME + i).get(); + } + + for (int i = 0; i < nodes; i++) { + IgniteEx ign = grid(i); + + ign.context().cache().context().wal().flush(null, true); + + WALIterator walIt = wal(grid(i)); + + long snpCnt = 0; + + SystemView<SnapshotView> snpView = ign.context().systemView().view(SNAPSHOT_SYS_VIEW); + + for (IgniteBiTuple<WALPointer, WALRecord> tuple: walIt) { + WALRecord rec = tuple.getValue(); + + if (rec.type() == WALRecord.RecordType.CLUSTER_SNAPSHOT) { + SnapshotMetadata metadata = snp(grid(i)).readSnapshotMetadata( + snp(grid(i)).snapshotLocalDir(SNAPSHOT_NAME + snpCnt), + (String)grid(i).configuration().getConsistentId()); + + assertEquals(tuple.getKey(), metadata.snapshotRecordPointer()); + + List<SnapshotView> snpNodesView = StreamSupport.stream(snpView.spliterator(), false) + .filter(v -> v.name().equals(metadata.snapshotName())) + .filter(v -> v.consistentId().equals(ign.localNode().consistentId())) + .filter(v -> v.snapshotRecordSegment().equals(tuple.getKey().index())) + .collect(Collectors.toList()); + + assertEquals(1, snpNodesView.size()); + + snpCnt++; + } + } + + assertEquals(snapshots, snpCnt); + } + } + /** * Parsing WAL files and dumping cache states: fisrst is before {@link ClusterSnapshotRecord} was written, and second * is after all load operations stopped.