This is an automated email from the ASF dual-hosted git repository. timoninmaxim pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 8be2e2cd54f IGNITE-22412: Unify snapshot validation jobs (#11368) 8be2e2cd54f is described below commit 8be2e2cd54f7a7a8930086f5476c7bca0663d4ed Author: Vladimir Steshin <vlads...@gmail.com> AuthorDate: Thu Jun 6 09:11:52 2024 +0300 IGNITE-22412: Unify snapshot validation jobs (#11368) --- .../snapshot/AbstractSnapshotVerificationTask.java | 86 ++++++++++++++++------ .../IncrementalSnapshotVerificationTask.java | 51 ++----------- .../snapshot/SnapshotHandlerContext.java | 6 +- .../snapshot/SnapshotHandlerRestoreTask.java | 60 ++------------- .../snapshot/SnapshotPartitionsVerifyTask.java | 50 ++----------- .../snapshot/SnapshotPartitionsVerifyTaskArg.java | 10 +-- 6 files changed, 89 insertions(+), 174 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java index cff0b0e110d..e8d348fc6da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotVerificationTask.java @@ -24,14 +24,17 @@ import java.util.List; import java.util.Map; import java.util.Set; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeJobAdapter; import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.compute.ComputeJobResultPolicy; import org.apache.ignite.compute.ComputeTaskAdapter; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.Nullable; /** @@ -49,6 +52,10 @@ public abstract class AbstractSnapshotVerificationTask extends @IgniteInstanceResource protected IgniteEx ignite; + /** Injected logger. */ + @LoggerResource + protected IgniteLogger log; + /** {@inheritDoc} */ @Override public Map<ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, SnapshotPartitionsVerifyTaskArg arg) { Map<ClusterNode, List<SnapshotMetadata>> clusterMetas = arg.clusterMetadata(); @@ -72,17 +79,7 @@ public abstract class AbstractSnapshotVerificationTask extends if (meta == null) continue; - jobs.put( - createJob( - meta.snapshotName(), - arg.snapshotPath(), - arg.incrementIndex(), - meta.consistentId(), - arg.cacheGroupNames(), - arg.check() - ), - e.getKey() - ); + jobs.put(createJob(meta.snapshotName(), meta.consistentId(), arg), e.getKey()); if (allMetas.isEmpty()) break; @@ -100,19 +97,60 @@ public abstract class AbstractSnapshotVerificationTask extends /** * @param name Snapshot name. - * @param path Snapshot directory path. - * @param incIdx Incremental snapshot index. - * @param constId Snapshot metadata file name. - * @param groups Cache groups to be restored from the snapshot. May be empty if all cache groups are being restored. - * @param check If {@code true} check snapshot before restore. + * @param consId Consistent id of the related node. + * @param args Check snapshot parameters. + * * @return Compute job. */ - protected abstract ComputeJob createJob( - String name, - @Nullable String path, - int incIdx, - String constId, - Collection<String> groups, - boolean check - ); + protected abstract AbstractSnapshotVerificationJob createJob(String name, String consId, SnapshotPartitionsVerifyTaskArg args); + + /** */ + protected abstract static class AbstractSnapshotVerificationJob extends ComputeJobAdapter { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Ignite instance. */ + @IgniteInstanceResource + protected IgniteEx ignite; + + /** Injected logger. */ + @LoggerResource + protected IgniteLogger log; + + /** Snapshot name. */ + protected final String snpName; + + /** Snapshot directory path. */ + @Nullable protected final String snpPath; + + /** Consistent id of the related node. */ + protected final String consId; + + /** Set of cache groups to be checked in the snapshot. {@code Null} or empty to check everything. */ + @Nullable protected final Collection<String> rqGrps; + + /** If {@code true}, calculates and compares partition hashes. Otherwise, only basic snapshot validation is launched. */ + protected final boolean check; + + /** + * @param snpName Snapshot name. + * @param snpPath Snapshot directory path. + * @param consId Consistent id of the related node. + * @param rqGrps Set of cache groups to be checked in the snapshot. {@code Null} or empty to check everything. + * @param check If {@code true}, calculates and compares partition hashes. Otherwise, only basic snapshot validation is launched. + */ + protected AbstractSnapshotVerificationJob( + String snpName, + @Nullable String snpPath, + String consId, + @Nullable Collection<String> rqGrps, + boolean check + ) { + this.snpName = snpName; + this.snpPath = snpPath; + this.consId = consId; + this.rqGrps = rqGrps; + this.check = check; + } + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java index 6cde9cfff44..a6779e8e7e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java @@ -36,13 +36,9 @@ import java.util.function.Function; import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.compute.ComputeJob; -import org.apache.ignite.compute.ComputeJobAdapter; import org.apache.ignite.compute.ComputeJobResult; -import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.management.cache.IdleVerifyResultV2; import org.apache.ignite.internal.management.cache.PartitionKeyV2; import org.apache.ignite.internal.processors.cache.GridCacheOperation; @@ -57,8 +53,6 @@ import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.marshaller.MarshallerUtils; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; @@ -71,14 +65,6 @@ public class IncrementalSnapshotVerificationTask extends AbstractSnapshotVerific /** Serial version uid. */ private static final long serialVersionUID = 0L; - /** Ignite instance. */ - @IgniteInstanceResource - private IgniteEx ignite; - - /** Injected logger. */ - @LoggerResource - private IgniteLogger log; - /** {@inheritDoc} */ @Override public SnapshotPartitionsVerifyTaskResult reduce(List<ComputeJobResult> results) throws IgniteException { Map<Object, Map<Object, TransactionsHashRecord>> nodeTxHashMap = new HashMap<>(); @@ -147,42 +133,18 @@ public class IncrementalSnapshotVerificationTask extends AbstractSnapshotVerific } /** {@inheritDoc} */ - @Override protected ComputeJob createJob( - String name, - @Nullable String path, - int incIdx, - String constId, - Collection<String> groups, - boolean check - ) { - return new VerifyIncrementalSnapshotJob(name, path, incIdx, constId); + @Override protected VerifyIncrementalSnapshotJob createJob(String name, String consId, SnapshotPartitionsVerifyTaskArg args) { + return new VerifyIncrementalSnapshotJob(name, args.snapshotPath(), args.incrementIndex(), consId); } /** */ - private static class VerifyIncrementalSnapshotJob extends ComputeJobAdapter { + private static class VerifyIncrementalSnapshotJob extends AbstractSnapshotVerificationJob { /** Serial version uid. */ private static final long serialVersionUID = 0L; - /** Ignite instance. */ - @IgniteInstanceResource - private IgniteEx ignite; - - /** Injected logger. */ - @LoggerResource - private IgniteLogger log; - - /** Snapshot name to validate. */ - private final String snpName; - - /** Snapshot directory path. */ - private final String snpPath; - /** Incremental snapshot index. */ private final int incIdx; - /** Consistent ID. */ - private final String consId; - /** */ private LongAdder procEntriesCnt; @@ -190,7 +152,7 @@ public class IncrementalSnapshotVerificationTask extends AbstractSnapshotVerific * @param snpName Snapshot name. * @param snpPath Snapshot directory path. * @param incIdx Incremental snapshot index. - * @param consId Consistent ID. + * @param consId Consistent id of the related node. */ public VerifyIncrementalSnapshotJob( String snpName, @@ -198,10 +160,9 @@ public class IncrementalSnapshotVerificationTask extends AbstractSnapshotVerific int incIdx, String consId ) { - this.snpName = snpName; - this.snpPath = snpPath; + super(snpName, snpPath, consId, null, true); + this.incIdx = incIdx; - this.consId = consId; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java index ceceb3c785b..07b9946dfcc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java @@ -41,7 +41,7 @@ public class SnapshotHandlerContext { /** Warning flag of concurrent inconsistent-by-nature streamer updates. */ private final boolean streamerWrn; - /** If {@code true} check snapshot integrity. */ + /** If {@code true}, calculates and compares partition hashes. Otherwise, only basic snapshot validation is launched.*/ private final boolean check; /** @@ -51,7 +51,7 @@ public class SnapshotHandlerContext { * @param locNode Local node. * @param snpDir The full path to the snapshot files. * @param streamerWrn {@code True} if concurrent streaming updates occurred during snapshot operation. - * @param check If {@code true} check snapshot integrity. + * @param check If {@code true}, calculates and compares partition hashes. Otherwise, only basic snapshot validation is launched. */ public SnapshotHandlerContext( SnapshotMetadata metadata, @@ -105,7 +105,7 @@ public class SnapshotHandlerContext { return streamerWrn; } - /** @return If {@code true} check snapshot integrity. */ + /** @return If {@code true}, calculates and compares partition hashes. Otherwise, only basic snapshot validation is launched. */ public boolean check() { return check; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java index 5c34511ce27..c2db78f2093 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java @@ -27,14 +27,8 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.compute.ComputeJob; -import org.apache.ignite.compute.ComputeJobAdapter; import org.apache.ignite.compute.ComputeJobResult; -import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.Nullable; /** @@ -44,20 +38,9 @@ public class SnapshotHandlerRestoreTask extends AbstractSnapshotVerificationTask /** Serial version uid. */ private static final long serialVersionUID = 0L; - /** Injected ignite logger. */ - @LoggerResource - private IgniteLogger log; - /** {@inheritDoc} */ - @Override protected ComputeJob createJob( - String name, - @Nullable String path, - int incIdx, - String constId, - Collection<String> groups, - boolean check - ) { - return new SnapshotHandlerRestoreJob(name, path, constId, groups, check); + @Override protected SnapshotHandlerRestoreJob createJob(String name, String consId, SnapshotPartitionsVerifyTaskArg args) { + return new SnapshotHandlerRestoreJob(name, args.snapshotPath(), consId, args.cacheGroupNames(), args.check()); } /** {@inheritDoc} */ @@ -100,52 +83,25 @@ public class SnapshotHandlerRestoreTask extends AbstractSnapshotVerificationTask } /** Invokes all {@link SnapshotHandlerType#RESTORE} handlers locally. */ - private static class SnapshotHandlerRestoreJob extends ComputeJobAdapter { + private static class SnapshotHandlerRestoreJob extends AbstractSnapshotVerificationJob { /** Serial version uid. */ private static final long serialVersionUID = 0L; - /** Ignite instance. */ - @IgniteInstanceResource - private IgniteEx ignite; - - /** Injected logger. */ - @LoggerResource - private IgniteLogger log; - - /** Snapshot name. */ - private final String snpName; - - /** String representation of the consistent node ID. */ - private final String consistentId; - - /** Cache group names. */ - private final Collection<String> grps; - - /** Snapshot directory path. */ - private final String snpPath; - - /** If {@code true} check snapshot before restore. */ - private final boolean check; - /** * @param snpName Snapshot name. * @param snpPath Snapshot directory path. - * @param consistentId String representation of the consistent node ID. + * @param consId Consistent id of the related node. * @param grps Cache group names. * @param check If {@code true} check snapshot before restore. */ public SnapshotHandlerRestoreJob( String snpName, @Nullable String snpPath, - String consistentId, + String consId, Collection<String> grps, boolean check ) { - this.snpName = snpName; - this.snpPath = snpPath; - this.consistentId = consistentId; - this.grps = grps; - this.check = check; + super(snpName, snpPath, consId, grps, check); } /** {@inheritDoc} */ @@ -153,10 +109,10 @@ public class SnapshotHandlerRestoreTask extends AbstractSnapshotVerificationTask try { IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr(); File snpDir = snpMgr.snapshotLocalDir(snpName, snpPath); - SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpDir, consistentId); + SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpDir, consId); return snpMgr.handlers().invokeAll(SnapshotHandlerType.RESTORE, - new SnapshotHandlerContext(meta, grps, ignite.localNode(), snpDir, false, check)); + new SnapshotHandlerContext(meta, rqGrps, ignite.localNode(), snpDir, false, check)); } catch (IgniteCheckedException | IOException e) { throw new IgniteException(e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java index 40fa00fcedf..b08c6dc6619 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java @@ -25,18 +25,12 @@ import java.util.Map; import java.util.Objects; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.compute.ComputeJob; -import org.apache.ignite.compute.ComputeJobAdapter; import org.apache.ignite.compute.ComputeJobResult; -import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.management.cache.PartitionKeyV2; import org.apache.ignite.internal.management.cache.VerifyBackupPartitionsTaskV2; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; import org.apache.ignite.internal.processors.task.GridInternal; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.management.cache.VerifyBackupPartitionsTaskV2.reduce0; @@ -52,15 +46,8 @@ public class SnapshotPartitionsVerifyTask extends AbstractSnapshotVerificationTa private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override protected ComputeJob createJob( - String name, - String path, - int incIdx, - String constId, - Collection<String> groups, - boolean check - ) { - return new VerifySnapshotPartitionsJob(name, path, constId, groups, check); + @Override protected VerifySnapshotPartitionsJob createJob(String name, String consId, SnapshotPartitionsVerifyTaskArg args) { + return new VerifySnapshotPartitionsJob(name, args.snapshotPath(), consId, args.cacheGroupNames(), args.check()); } /** {@inheritDoc} */ @@ -69,36 +56,13 @@ public class SnapshotPartitionsVerifyTask extends AbstractSnapshotVerificationTa } /** Job that collects update counters of snapshot partitions on the node it executes. */ - private static class VerifySnapshotPartitionsJob extends ComputeJobAdapter { + private static class VerifySnapshotPartitionsJob extends AbstractSnapshotVerificationJob { /** Serial version uid. */ private static final long serialVersionUID = 0L; - /** Ignite instance. */ - @IgniteInstanceResource - private IgniteEx ignite; - - /** Injected logger. */ - @LoggerResource - private IgniteLogger log; - - /** Snapshot name to validate. */ - private final String snpName; - - /** Snapshot directory path. */ - private final String snpPath; - - /** Consistent snapshot metadata file name. */ - private final String consId; - - /** Set of cache groups to be checked in the snapshot or {@code empty} to check everything. */ - private final Collection<String> rqGrps; - - /** If {@code true} check snapshot before restore. */ - private final boolean check; - /** * @param snpName Snapshot name to validate. - * @param consId Consistent snapshot metadata file name. + * @param consId Consistent id of the related node. * @param rqGrps Set of cache groups to be checked in the snapshot or {@code empty} to check everything. * @param snpPath Snapshot directory path. * @param check If {@code true} check snapshot before restore. @@ -110,11 +74,7 @@ public class SnapshotPartitionsVerifyTask extends AbstractSnapshotVerificationTa Collection<String> rqGrps, boolean check ) { - this.snpName = snpName; - this.consId = consId; - this.rqGrps = rqGrps; - this.snpPath = snpPath; - this.check = check; + super(snpName, snpPath, consId, rqGrps, check); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskArg.java index 90624642654..c3c660d94b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskArg.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskArg.java @@ -36,13 +36,13 @@ public class SnapshotPartitionsVerifyTaskArg extends VisorDataTransferObject { private static final long serialVersionUID = 0L; /** Cache group names to be verified. */ - private Collection<String> grpNames; + @Nullable private Collection<String> grpNames; /** The map of distribution of snapshot metadata pieces across the cluster. */ private Map<ClusterNode, List<SnapshotMetadata>> clusterMetas; /** Snapshot directory path. */ - private String snpPath; + @Nullable private String snpPath; /** If {@code true} check snapshot integrity. */ private boolean check; @@ -63,7 +63,7 @@ public class SnapshotPartitionsVerifyTaskArg extends VisorDataTransferObject { * @param check If {@code true} check snapshot integrity. */ public SnapshotPartitionsVerifyTaskArg( - Collection<String> grpNames, + @Nullable Collection<String> grpNames, Map<ClusterNode, List<SnapshotMetadata>> clusterMetas, @Nullable String snpPath, int incIdx, @@ -79,7 +79,7 @@ public class SnapshotPartitionsVerifyTaskArg extends VisorDataTransferObject { /** * @return Cache group names to be verified. */ - public Collection<String> cacheGroupNames() { + @Nullable public Collection<String> cacheGroupNames() { return grpNames; } @@ -93,7 +93,7 @@ public class SnapshotPartitionsVerifyTaskArg extends VisorDataTransferObject { /** * @return Snapshot directory path. */ - public String snapshotPath() { + @Nullable public String snapshotPath() { return snpPath; }