timoninmaxim commented on code in PR #11391: URL: https://github.com/apache/ignite/pull/11391#discussion_r1718304586
########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.management.cache.IdleVerifyResultV2; +import org.apache.ignite.internal.management.cache.PartitionKeyV2; +import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.SNAPSHOT_CHECK_METAS; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.SNAPSHOT_VALIDATE_PARTS; + +/** Distributed process of snapshot checking (with the partition hashes). */ +public class SnapshotCheckProcess { + /** */ + private final IgniteLogger log; + + /** */ + private final GridKernalContext kctx; + + /** Operation contexts by name. */ + private final Map<String, SnapshotCheckContext> contexts = new ConcurrentHashMap<>(); + + /** Cluster-wide operation futures per snapshot called from current node. */ + private final Map<UUID, GridFutureAdapter<SnapshotPartitionsVerifyTaskResult>> clusterOpFuts = new ConcurrentHashMap<>(); + + /** Check metas first phase subprocess. */ + private final DistributedProcess<SnapshotCheckProcessRequest, SnapshotCheckResponse> phase1CheckMetas; + + /** Partition hashes second phase subprocess. */ + private final DistributedProcess<SnapshotCheckProcessRequest, SnapshotCheckResponse> phase2PartsHashes; + + /** */ + public SnapshotCheckProcess(GridKernalContext kctx) { + this.kctx = kctx; + + log = kctx.log(getClass()); + + phase1CheckMetas = new DistributedProcess<>(kctx, SNAPSHOT_CHECK_METAS, this::prepareAndCheckMetas, + this::reducePreparationAndMetasCheck); + + phase2PartsHashes = new DistributedProcess<>(kctx, SNAPSHOT_VALIDATE_PARTS, this::validateParts, + this::reduceValidatePartsAndFinish); + + kctx.event().addLocalEventListener(evt -> onNodeLeft(((DiscoveryEvent)evt).eventNode().id()), EVT_NODE_FAILED, EVT_NODE_LEFT); + } + + /** Expected to run in a discovery-managed thread. */ + private void onNodeLeft(UUID nodeId) { + Throwable err = new ClusterTopologyCheckedException("Snapshot validation stopped. A required node left the cluster " + + "[nodeId=" + nodeId + ']'); + + contexts.values().forEach(ctx -> { + if (ctx.req.nodes().contains(nodeId)) + ctx.locProcFut.onDone(err); + }); + } + + /** */ + Map<String, SnapshotCheckContext> contexts() { + return Collections.unmodifiableMap(contexts); + } + + /** + * Stops all the processes with the passed exception. + * + * @param err The interrupt reason. + */ + void interrupt(Throwable err) { + contexts.forEach((snpName, ctx) -> ctx.locProcFut.onDone(err)); + + contexts.clear(); + + clusterOpFuts.forEach((reqId, fut) -> fut.onDone(err)); + } + + /** Phase 2 and process finish. */ + private IgniteInternalFuture<?> reduceValidatePartsAndFinish( + UUID reqId, + Map<UUID, SnapshotCheckResponse> results, + Map<UUID, Throwable> errors + ) { + SnapshotCheckContext ctx = context(null, reqId); + + if (ctx == null) + return new GridFinishedFuture<>(); + + contexts.remove(ctx.req.snapshotName()); + + if (log.isInfoEnabled()) + log.info("Finished snapshot validation [req=" + ctx.req + ']'); + + GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> clusterOpFut = clusterOpFuts.get(reqId); + + if (clusterOpFut == null) + return new GridFinishedFuture<>(); + + assert results.values().stream().noneMatch(res -> res != null && res.metas != null); + + if (ctx.req.allRestoreHandlers) { + try { + if (!F.isEmpty(errors)) + throw F.firstValue(errors); + + Map<ClusterNode, Map<String, SnapshotHandlerResult<?>>> cstRes = mapCustomHandlersResults(results, ctx.req.nodes()); + + kctx.cache().context().snapshotMgr().checker().checkCustomHandlersResults(ctx.req.snapshotName(), cstRes); + + clusterOpFut.onDone(new SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, null)); + } + catch (Throwable err) { + clusterOpFut.onDone(err); + } + } + else { + Map<ClusterNode, Exception> errors0 = mapErrors(errors); + + if (!F.isEmpty(results)) { + Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>> results0 = mapPartsHashes(results, ctx.req.nodes()); + + IdleVerifyResultV2 chkRes = SnapshotChecker.reduceHashesResults(results0, errors0); + + clusterOpFut.onDone(new SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, chkRes)); + } + else { + assert !errors0.isEmpty(); + + clusterOpFut.onDone(new IgniteSnapshotVerifyException(errors0)); + } + } + + return new GridFinishedFuture<>(); + } + + /** Phase 2 beginning. */ + private IgniteInternalFuture<SnapshotCheckResponse> validateParts(SnapshotCheckProcessRequest req) { + if (!req.nodes().contains(kctx.localNodeId())) + return new GridFinishedFuture<>(); + + SnapshotCheckContext ctx = context(req.snapshotName(), req.requestId()); + + assert ctx != null; + + if (ctx.locMeta == null) + return new GridFinishedFuture<>(); + + IgniteSnapshotManager snpMgr = kctx.cache().context().snapshotMgr(); + + GridFutureAdapter<SnapshotCheckResponse> phaseFut = ctx.phaseFut(); + + // Might be already finished by #onNodeLeft(). + if (!phaseFut.isDone()) { + CompletableFuture<? extends Map<?, ?>> workingFut = req.allRestoreHandlers + ? snpMgr.checker().invokeCustomHandlers(ctx.locMeta, req.snapshotPath(), req.groups(), true) + : snpMgr.checker().checkPartitions(ctx.locMeta, snpMgr.snapshotLocalDir(req.snapshotName(), req.snapshotPath()), + req.groups(), false, true, false); + + workingFut.whenComplete((res, err) -> { + if (err != null) + phaseFut.onDone(err); + else + phaseFut.onDone(new SnapshotCheckResponse(res)); + }); + } + + return phaseFut; + } + + /** */ + private Map<ClusterNode, Exception> mapErrors(@Nullable Map<UUID, Throwable> errors) { + if (F.isEmpty(errors)) + return Collections.emptyMap(); + + return errors.entrySet().stream() + .collect(Collectors.toMap(e -> kctx.cluster().get().node(e.getKey()), + e -> e.getValue() instanceof Exception ? (Exception)e.getValue() : new IgniteException(e.getValue()))); + } + + /** */ + private Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>> mapPartsHashes( + @Nullable Map<UUID, SnapshotCheckResponse> results, + Collection<UUID> requiredNodes + ) { + if (F.isEmpty(results)) + return Collections.emptyMap(); + + // A not required node can leave the cluster and its result can be null. + return results.entrySet().stream() + .filter(e -> requiredNodes.contains(e.getKey()) && e.getValue() != null) + .collect(Collectors.toMap(e -> kctx.cluster().get().node(e.getKey()), e -> e.getValue().partsHashes())); + } + + /** */ + private Map<ClusterNode, Map<String, SnapshotHandlerResult<?>>> mapCustomHandlersResults( + Map<UUID, SnapshotCheckResponse> results, + Set<UUID> requiredNodes + ) { + if (F.isEmpty(results)) + return Collections.emptyMap(); + + // A not required node can leave the cluster and its result can be null. + return results.entrySet().stream() + .filter(e -> requiredNodes.contains(e.getKey()) && e.getValue() != null) + .collect(Collectors.toMap(e -> kctx.cluster().get().node(e.getKey()), e -> e.getValue().customHandlersResults())); + } + + /** + * @param snpName Snapshot name of the validation process. If {@code null}, ignored. + * @param reqId If {@code snpName} is {@code null}, is used to find the operation request. + * @return Current snapshot checking context by {@code snpName} or {@code reqId}. + */ + private @Nullable SnapshotCheckContext context(@Nullable String snpName, UUID reqId) { + SnapshotCheckContext ctx = snpName == null + ? contexts.values().stream().filter(ctx0 -> ctx0.req.reqId.equals(reqId)).findFirst().orElse(null) + : contexts.get(snpName); + + assert ctx == null || ctx.req.reqId.equals(reqId); + + return ctx; + } + + /** Phase 1 beginning: prepare, collect and check local metas. */ + private IgniteInternalFuture<SnapshotCheckResponse> prepareAndCheckMetas(SnapshotCheckProcessRequest req) { + if (!req.nodes().contains(kctx.localNodeId())) + return new GridFinishedFuture<>(); + + SnapshotCheckContext ctx = contexts.computeIfAbsent(req.snapshotName(), snpName -> new SnapshotCheckContext(req)); + + if (!ctx.req.requestId().equals(req.requestId())) { + return new GridFinishedFuture<>(new IllegalStateException("Validation of snapshot '" + req.snapshotName() + + "' has already started. Request=" + ctx + '.')); + } + + // Excludes non-baseline initiator. + if (!baseline(kctx.localNodeId())) + return new GridFinishedFuture<>(); + + IgniteSnapshotManager snpMgr = kctx.cache().context().snapshotMgr(); + + Collection<Integer> grpIds = F.isEmpty(req.groups()) ? null : F.viewReadOnly(req.groups(), CU::cacheId); + + GridFutureAdapter<SnapshotCheckResponse> phaseFut = ctx.phaseFut(); + + // Might be already finished by #onNodeLeft(). + if (!phaseFut.isDone()) { + snpMgr.checker().checkLocalMetas( + snpMgr.snapshotLocalDir(req.snapshotName(), req.snapshotPath()), + grpIds, + kctx.cluster().get().localNode().consistentId() + ).whenComplete((locMetas, err) -> { + if (err != null) + phaseFut.onDone(err); + else + phaseFut.onDone(new SnapshotCheckResponse(locMetas)); + }); + } + + return phaseFut; + } + + /** Phase 1 end. */ + private void reducePreparationAndMetasCheck( + UUID reqId, + Map<UUID, SnapshotCheckResponse> results, + Map<UUID, Throwable> errors + ) { + String snpName = snpName(results); + + SnapshotCheckContext ctx = context(snpName, reqId); + + // The context is not stored in the case of concurrent check of the same snapshot but the operation future is registered. + GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> clusterOpFut = clusterOpFuts.get(reqId); + + try { + if (!F.isEmpty(errors)) + throw new IgniteSnapshotVerifyException(mapErrors(errors)); + + if (ctx == null) { + assert clusterOpFut == null; + + return; + } + + if (ctx.locProcFut.error() != null) + throw ctx.locProcFut.error(); + + Map<ClusterNode, List<SnapshotMetadata>> metas = new HashMap<>(); + + results.forEach((nodeId, nodeRes) -> { + // A node might be not required. It gives null result. But a required node might have invalid empty result + // which must be validated. + if (ctx.req.nodes().contains(nodeId) && baseline(nodeId)) { + assert nodeRes != null && nodeRes.partsResults == null; + + metas.put(kctx.cluster().get().node(nodeId), nodeRes.metas); + } + }); + + Map<ClusterNode, Exception> metasCheck = SnapshotChecker.reduceMetasResults(ctx.req.snapshotName(), ctx.req.snapshotPath(), + metas, null, kctx.cluster().get().localNode().consistentId()); + + if (!F.isEmpty(metasCheck)) + throw new IgniteSnapshotVerifyException(metasCheck); + + List<SnapshotMetadata> locMetas = metas.get(kctx.cluster().get().localNode()); + + ctx.locMeta = F.isEmpty(locMetas) ? null : locMetas.get(0); + + if (clusterOpFut != null) + ctx.clusterMetas = metas; + + if (U.isLocalNodeCoordinator(kctx.discovery())) + phase2PartsHashes.start(reqId, ctx.req); + } + catch (Throwable th) { + if (ctx != null) { + contexts.remove(ctx.req.snapshotName()); + + if (log.isInfoEnabled()) + log.info("Finished snapshot validation [req=" + ctx.req + ']'); + } + + if (clusterOpFut != null) + clusterOpFut.onDone(th); + } + } + + /** Finds current snapshot name from the metas. */ + private @Nullable String snpName(@Nullable Map<UUID, SnapshotCheckResponse> results) { Review Comment: `results` never nullable ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotOperationRequest.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * Snapshot operation start request for {@link DistributedProcess} initiate message. + */ +abstract class AbstractSnapshotOperationRequest implements Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Request ID. */ + @GridToStringInclude + protected final UUID reqId; + + /** Snapshot name. */ + @GridToStringInclude + protected final String snpName; Review Comment: Let's use either protected fields, either getters with private fields. Same for descending classes ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.management.cache.IdleVerifyResultV2; +import org.apache.ignite.internal.management.cache.PartitionKeyV2; +import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.SNAPSHOT_CHECK_METAS; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.SNAPSHOT_VALIDATE_PARTS; + +/** Distributed process of snapshot checking (with the partition hashes). */ +public class SnapshotCheckProcess { + /** */ + private final IgniteLogger log; + + /** */ + private final GridKernalContext kctx; + + /** Operation contexts by name. */ + private final Map<String, SnapshotCheckContext> contexts = new ConcurrentHashMap<>(); + + /** Cluster-wide operation futures per snapshot called from current node. */ + private final Map<UUID, GridFutureAdapter<SnapshotPartitionsVerifyTaskResult>> clusterOpFuts = new ConcurrentHashMap<>(); + + /** Check metas first phase subprocess. */ + private final DistributedProcess<SnapshotCheckProcessRequest, SnapshotCheckResponse> phase1CheckMetas; + + /** Partition hashes second phase subprocess. */ + private final DistributedProcess<SnapshotCheckProcessRequest, SnapshotCheckResponse> phase2PartsHashes; + + /** */ + public SnapshotCheckProcess(GridKernalContext kctx) { + this.kctx = kctx; + + log = kctx.log(getClass()); + + phase1CheckMetas = new DistributedProcess<>(kctx, SNAPSHOT_CHECK_METAS, this::prepareAndCheckMetas, + this::reducePreparationAndMetasCheck); + + phase2PartsHashes = new DistributedProcess<>(kctx, SNAPSHOT_VALIDATE_PARTS, this::validateParts, + this::reduceValidatePartsAndFinish); + + kctx.event().addLocalEventListener(evt -> onNodeLeft(((DiscoveryEvent)evt).eventNode().id()), EVT_NODE_FAILED, EVT_NODE_LEFT); + } + + /** Expected to run in a discovery-managed thread. */ + private void onNodeLeft(UUID nodeId) { + Throwable err = new ClusterTopologyCheckedException("Snapshot validation stopped. A required node left the cluster " + + "[nodeId=" + nodeId + ']'); + + contexts.values().forEach(ctx -> { + if (ctx.req.nodes().contains(nodeId)) + ctx.locProcFut.onDone(err); + }); + } + + /** */ + Map<String, SnapshotCheckContext> contexts() { + return Collections.unmodifiableMap(contexts); + } + + /** + * Stops all the processes with the passed exception. + * + * @param err The interrupt reason. + */ + void interrupt(Throwable err) { + contexts.forEach((snpName, ctx) -> ctx.locProcFut.onDone(err)); + + contexts.clear(); + + clusterOpFuts.forEach((reqId, fut) -> fut.onDone(err)); + } + + /** Phase 2 and process finish. */ + private IgniteInternalFuture<?> reduceValidatePartsAndFinish( + UUID reqId, + Map<UUID, SnapshotCheckResponse> results, + Map<UUID, Throwable> errors + ) { + SnapshotCheckContext ctx = context(null, reqId); + + if (ctx == null) + return new GridFinishedFuture<>(); + + contexts.remove(ctx.req.snapshotName()); + + if (log.isInfoEnabled()) + log.info("Finished snapshot validation [req=" + ctx.req + ']'); + + GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> clusterOpFut = clusterOpFuts.get(reqId); + + if (clusterOpFut == null) + return new GridFinishedFuture<>(); + + assert results.values().stream().noneMatch(res -> res != null && res.metas != null); + + if (ctx.req.allRestoreHandlers) { + try { + if (!F.isEmpty(errors)) + throw F.firstValue(errors); + + Map<ClusterNode, Map<String, SnapshotHandlerResult<?>>> cstRes = mapCustomHandlersResults(results, ctx.req.nodes()); + + kctx.cache().context().snapshotMgr().checker().checkCustomHandlersResults(ctx.req.snapshotName(), cstRes); + + clusterOpFut.onDone(new SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, null)); + } + catch (Throwable err) { + clusterOpFut.onDone(err); + } + } + else { + Map<ClusterNode, Exception> errors0 = mapErrors(errors); + + if (!F.isEmpty(results)) { + Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>> results0 = mapPartsHashes(results, ctx.req.nodes()); + + IdleVerifyResultV2 chkRes = SnapshotChecker.reduceHashesResults(results0, errors0); + + clusterOpFut.onDone(new SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, chkRes)); + } + else { + assert !errors0.isEmpty(); + + clusterOpFut.onDone(new IgniteSnapshotVerifyException(errors0)); + } + } + + return new GridFinishedFuture<>(); + } + + /** Phase 2 beginning. */ + private IgniteInternalFuture<SnapshotCheckResponse> validateParts(SnapshotCheckProcessRequest req) { + if (!req.nodes().contains(kctx.localNodeId())) + return new GridFinishedFuture<>(); + + SnapshotCheckContext ctx = context(req.snapshotName(), req.requestId()); + + assert ctx != null; + + if (ctx.locMeta == null) + return new GridFinishedFuture<>(); + + IgniteSnapshotManager snpMgr = kctx.cache().context().snapshotMgr(); + + GridFutureAdapter<SnapshotCheckResponse> phaseFut = ctx.phaseFut(); + + // Might be already finished by #onNodeLeft(). + if (!phaseFut.isDone()) { + CompletableFuture<? extends Map<?, ?>> workingFut = req.allRestoreHandlers + ? snpMgr.checker().invokeCustomHandlers(ctx.locMeta, req.snapshotPath(), req.groups(), true) + : snpMgr.checker().checkPartitions(ctx.locMeta, snpMgr.snapshotLocalDir(req.snapshotName(), req.snapshotPath()), + req.groups(), false, true, false); + + workingFut.whenComplete((res, err) -> { + if (err != null) + phaseFut.onDone(err); + else + phaseFut.onDone(new SnapshotCheckResponse(res)); + }); + } + + return phaseFut; + } + + /** */ + private Map<ClusterNode, Exception> mapErrors(@Nullable Map<UUID, Throwable> errors) { + if (F.isEmpty(errors)) + return Collections.emptyMap(); + + return errors.entrySet().stream() + .collect(Collectors.toMap(e -> kctx.cluster().get().node(e.getKey()), + e -> e.getValue() instanceof Exception ? (Exception)e.getValue() : new IgniteException(e.getValue()))); + } + + /** */ + private Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>> mapPartsHashes( + @Nullable Map<UUID, SnapshotCheckResponse> results, + Collection<UUID> requiredNodes + ) { + if (F.isEmpty(results)) + return Collections.emptyMap(); + + // A not required node can leave the cluster and its result can be null. + return results.entrySet().stream() + .filter(e -> requiredNodes.contains(e.getKey()) && e.getValue() != null) Review Comment: Looks like we already check requiredNodes in assert in the `reduceValidatePartsAndFinish`? ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.util.Collection; +import java.util.UUID; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * Snapshot full check (validation) distributed process request. + * + * @see SnapshotCheckProcess + */ +public class SnapshotCheckProcessRequest extends AbstractSnapshotOperationRequest { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** + * If {@code true}, all the registered {@link IgniteSnapshotManager#handlers()} of type {@link SnapshotHandlerType#RESTORE} + * are invoked. Otherwise, only snapshot metadatas and partition hashes are validated. + */ + @GridToStringInclude final boolean allRestoreHandlers; + + /** + * Creates snapshot check process request. + * + * @param reqId Request ID. + * @param snpName Snapshot name. + * @param nodes Baseline node IDs that must be alive to complete the operation.. + * @param snpPath Snapshot directory path. + * @param grps List of cache group names. + * @param incIdx Incremental snapshot index. + * @param allRestoreHandlers If {@code true}, all the registered {@link IgniteSnapshotManager#handlers()} of type + * {@link SnapshotHandlerType#RESTORE} are invoked. Otherwise, only snapshot metadatas and + * partition hashes are validated. + */ + SnapshotCheckProcessRequest( + UUID reqId, + Collection<UUID> nodes, + String snpName, + String snpPath, + @Nullable Collection<String> grps, + int incIdx, Review Comment: always 0, let's support check of incremental snapshots in next patch ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java: ########## @@ -536,19 +628,138 @@ private boolean hasExpiringEntries( return rootIO.getCount(pageAddr) != 0; } + /** Launches local partitions checking and waits for the result, handles execution exceptions. */ + public Map<PartitionKeyV2, PartitionHashRecordV2> checkPartitionsResult( + SnapshotMetadata meta, + File snpDir, + @Nullable Collection<String> groups, + boolean forCreation, + boolean checkParts, + boolean skipPartsHashes + ) { + try { + return checkPartitions(meta, snpDir, groups, forCreation, checkParts, skipPartsHashes).get(); + } + catch (Exception e) { + throw new IgniteException("Failed to get result of partitions validation of snapshot '" + meta.snapshotName() + "'.", e); + } + } + + /** Launches local partitions checking. */ + public CompletableFuture<Map<PartitionKeyV2, PartitionHashRecordV2>> checkPartitions( + SnapshotMetadata meta, + File snpDir, + @Nullable Collection<String> groups, + boolean forCreation, + boolean checkParts, + boolean skipPartsHashes + ) { + // Await in the default executor to avoid blocking the snapshot executor if it has just one thread. + return CompletableFuture.supplyAsync(() -> { + if (!snpDir.exists()) + throw new IllegalStateException("Snapshot directory doesn't exists: " + snpDir); + + ClusterNode locNode = kctx.cluster().get().localNode(); + + Set<Integer> grps = F.isEmpty(groups) + ? new HashSet<>(meta.partitions().keySet()) + : groups.stream().map(CU::cacheId).collect(Collectors.toSet()); + + if (forCreation) { + grps = grps.stream().filter(grp -> grp == MetaStorage.METASTORAGE_CACHE_ID || + CU.affinityNode( + locNode, + kctx.cache().cacheGroupDescriptor(grp).config().getNodeFilter() + ) + ).collect(Collectors.toSet()); + } + + if (meta.dump()) + return checkDumpFiles(snpDir, meta, grps, locNode.consistentId(), checkParts, skipPartsHashes); + + try { + return checkSnapshotFiles(snpDir, grps, meta, forCreation, checkParts, skipPartsHashes); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to check partitions of snapshot '" + meta.snapshotName() + "'.", e); + } + }); Review Comment: snapshot executor? ########## modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java: ########## @@ -488,6 +488,16 @@ public enum DistributedProcessType { /** * Incremental snapshot restore start phase. */ - RESTORE_INCREMENTAL_SNAPSHOT_START + RESTORE_INCREMENTAL_SNAPSHOT_START, + + /** + * Snapshot metadatas check. + */ + CHECK_SNAPSHOT_METAS, + + /** + * Snapshot partitions validation. + */ + VALIDATE_SNAPSHOT_PARTS_PARTS Review Comment: double PARTS ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotOperationRequest.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * Snapshot operation start request for {@link DistributedProcess} initiate message. + */ +abstract class AbstractSnapshotOperationRequest implements Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Request ID. */ + @GridToStringInclude + protected final UUID reqId; + + /** Snapshot name. */ + @GridToStringInclude + protected final String snpName; + + /** Snapshot directory path. */ + @GridToStringInclude + protected final String snpPath; + + /** List of cache group names. */ + @GridToStringInclude + protected final Collection<String> grps; + + /** Start time. */ + @GridToStringInclude + private final long startTime; + + /** IDs of the nodes that must be alive to complete the operation. */ + @GridToStringInclude + protected final Set<UUID> nodes; + + /** + * @param reqId Request ID. + * @param snpName Snapshot name. + * @param snpPath Snapshot directory path. + * @param grps List of cache group names. + * @param incIdx Incremental snapshot index. + * @param nodes IDs of the nodes that must be alive to complete the operation. + */ + protected AbstractSnapshotOperationRequest( + UUID reqId, + @Nullable UUID opNodeId, Review Comment: Useless param ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.management.cache.IdleVerifyResultV2; +import org.apache.ignite.internal.management.cache.PartitionKeyV2; +import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.SNAPSHOT_CHECK_METAS; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.SNAPSHOT_VALIDATE_PARTS; + +/** Distributed process of snapshot checking (with the partition hashes). */ +public class SnapshotCheckProcess { + /** */ + private final IgniteLogger log; + + /** */ + private final GridKernalContext kctx; + + /** Operation contexts by name. */ + private final Map<String, SnapshotCheckContext> contexts = new ConcurrentHashMap<>(); + + /** Cluster-wide operation futures per snapshot called from current node. */ + private final Map<UUID, GridFutureAdapter<SnapshotPartitionsVerifyTaskResult>> clusterOpFuts = new ConcurrentHashMap<>(); + + /** Check metas first phase subprocess. */ + private final DistributedProcess<SnapshotCheckProcessRequest, SnapshotCheckResponse> phase1CheckMetas; + + /** Partition hashes second phase subprocess. */ + private final DistributedProcess<SnapshotCheckProcessRequest, SnapshotCheckResponse> phase2PartsHashes; + + /** */ + public SnapshotCheckProcess(GridKernalContext kctx) { + this.kctx = kctx; + + log = kctx.log(getClass()); + + phase1CheckMetas = new DistributedProcess<>(kctx, SNAPSHOT_CHECK_METAS, this::prepareAndCheckMetas, + this::reducePreparationAndMetasCheck); + + phase2PartsHashes = new DistributedProcess<>(kctx, SNAPSHOT_VALIDATE_PARTS, this::validateParts, + this::reduceValidatePartsAndFinish); + + kctx.event().addLocalEventListener(evt -> onNodeLeft(((DiscoveryEvent)evt).eventNode().id()), EVT_NODE_FAILED, EVT_NODE_LEFT); + } + + /** Expected to run in a discovery-managed thread. */ + private void onNodeLeft(UUID nodeId) { + Throwable err = new ClusterTopologyCheckedException("Snapshot validation stopped. A required node left the cluster " + + "[nodeId=" + nodeId + ']'); + + contexts.values().forEach(ctx -> { + if (ctx.req.nodes().contains(nodeId)) + ctx.locProcFut.onDone(err); + }); + } + + /** */ + Map<String, SnapshotCheckContext> contexts() { + return Collections.unmodifiableMap(contexts); + } + + /** + * Stops all the processes with the passed exception. + * + * @param err The interrupt reason. + */ + void interrupt(Throwable err) { Review Comment: This method is invoked after `IgniteSnapshotManager#busyLock` acquired. But checking snapshot doesn't check this lock. Looks like all this collections aren't synchronized with stopping node. ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java: ########## @@ -218,73 +221,109 @@ public <T> T readFromFile(File smf) } } - /** */ - public List<SnapshotMetadata> checkLocalMetas(File snpFullPath, @Nullable Collection<Integer> cacheGrpIds, - @Nullable Object locNodeConsistId) { - List<SnapshotMetadata> snpMetas = readSnapshotMetadatas(snpFullPath, locNodeConsistId); + /** Launches local metas checking and waits for the result, handles execution exceptions. */ + public List<SnapshotMetadata> checkLocalMetasResult(File snpPath, @Nullable Collection<Integer> grpIds, @Nullable Object locNodeCstId) { Review Comment: Do we actually need this method, I suppose invoker can call `.get()` itself ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.management.cache.IdleVerifyResultV2; +import org.apache.ignite.internal.management.cache.PartitionKeyV2; +import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.SNAPSHOT_CHECK_METAS; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.SNAPSHOT_VALIDATE_PARTS; + +/** Distributed process of snapshot checking (with the partition hashes). */ +public class SnapshotCheckProcess { + /** */ + private final IgniteLogger log; + + /** */ + private final GridKernalContext kctx; + + /** Operation contexts by name. */ + private final Map<String, SnapshotCheckContext> contexts = new ConcurrentHashMap<>(); + + /** Cluster-wide operation futures per snapshot called from current node. */ + private final Map<UUID, GridFutureAdapter<SnapshotPartitionsVerifyTaskResult>> clusterOpFuts = new ConcurrentHashMap<>(); + + /** Check metas first phase subprocess. */ + private final DistributedProcess<SnapshotCheckProcessRequest, SnapshotCheckResponse> phase1CheckMetas; + + /** Partition hashes second phase subprocess. */ + private final DistributedProcess<SnapshotCheckProcessRequest, SnapshotCheckResponse> phase2PartsHashes; + + /** */ + public SnapshotCheckProcess(GridKernalContext kctx) { + this.kctx = kctx; + + log = kctx.log(getClass()); + + phase1CheckMetas = new DistributedProcess<>(kctx, SNAPSHOT_CHECK_METAS, this::prepareAndCheckMetas, + this::reducePreparationAndMetasCheck); + + phase2PartsHashes = new DistributedProcess<>(kctx, SNAPSHOT_VALIDATE_PARTS, this::validateParts, + this::reduceValidatePartsAndFinish); + + kctx.event().addLocalEventListener(evt -> onNodeLeft(((DiscoveryEvent)evt).eventNode().id()), EVT_NODE_FAILED, EVT_NODE_LEFT); + } + + /** Expected to run in a discovery-managed thread. */ + private void onNodeLeft(UUID nodeId) { + Throwable err = new ClusterTopologyCheckedException("Snapshot validation stopped. A required node left the cluster " + + "[nodeId=" + nodeId + ']'); + + contexts.values().forEach(ctx -> { + if (ctx.req.nodes().contains(nodeId)) + ctx.locProcFut.onDone(err); + }); + } + + /** */ + Map<String, SnapshotCheckContext> contexts() { + return Collections.unmodifiableMap(contexts); + } + + /** + * Stops all the processes with the passed exception. + * + * @param err The interrupt reason. + */ + void interrupt(Throwable err) { + contexts.forEach((snpName, ctx) -> ctx.locProcFut.onDone(err)); + + contexts.clear(); + + clusterOpFuts.forEach((reqId, fut) -> fut.onDone(err)); + } + + /** Phase 2 and process finish. */ + private IgniteInternalFuture<?> reduceValidatePartsAndFinish( + UUID reqId, + Map<UUID, SnapshotCheckResponse> results, + Map<UUID, Throwable> errors + ) { + SnapshotCheckContext ctx = context(null, reqId); + + if (ctx == null) + return new GridFinishedFuture<>(); + + contexts.remove(ctx.req.snapshotName()); + + if (log.isInfoEnabled()) + log.info("Finished snapshot validation [req=" + ctx.req + ']'); + + GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> clusterOpFut = clusterOpFuts.get(reqId); + + if (clusterOpFut == null) + return new GridFinishedFuture<>(); + + assert results.values().stream().noneMatch(res -> res != null && res.metas != null); + + if (ctx.req.allRestoreHandlers) { + try { + if (!F.isEmpty(errors)) + throw F.firstValue(errors); + + Map<ClusterNode, Map<String, SnapshotHandlerResult<?>>> cstRes = mapCustomHandlersResults(results, ctx.req.nodes()); + + kctx.cache().context().snapshotMgr().checker().checkCustomHandlersResults(ctx.req.snapshotName(), cstRes); + + clusterOpFut.onDone(new SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, null)); + } + catch (Throwable err) { + clusterOpFut.onDone(err); + } + } + else { + Map<ClusterNode, Exception> errors0 = mapErrors(errors); + + if (!F.isEmpty(results)) { + Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>> results0 = mapPartsHashes(results, ctx.req.nodes()); + + IdleVerifyResultV2 chkRes = SnapshotChecker.reduceHashesResults(results0, errors0); + + clusterOpFut.onDone(new SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, chkRes)); + } + else { + assert !errors0.isEmpty(); + + clusterOpFut.onDone(new IgniteSnapshotVerifyException(errors0)); + } + } + + return new GridFinishedFuture<>(); + } + + /** Phase 2 beginning. */ + private IgniteInternalFuture<SnapshotCheckResponse> validateParts(SnapshotCheckProcessRequest req) { + if (!req.nodes().contains(kctx.localNodeId())) + return new GridFinishedFuture<>(); + + SnapshotCheckContext ctx = context(req.snapshotName(), req.requestId()); + + assert ctx != null; + + if (ctx.locMeta == null) + return new GridFinishedFuture<>(); + + IgniteSnapshotManager snpMgr = kctx.cache().context().snapshotMgr(); + + GridFutureAdapter<SnapshotCheckResponse> phaseFut = ctx.phaseFut(); + + // Might be already finished by #onNodeLeft(). + if (!phaseFut.isDone()) { + CompletableFuture<? extends Map<?, ?>> workingFut = req.allRestoreHandlers + ? snpMgr.checker().invokeCustomHandlers(ctx.locMeta, req.snapshotPath(), req.groups(), true) + : snpMgr.checker().checkPartitions(ctx.locMeta, snpMgr.snapshotLocalDir(req.snapshotName(), req.snapshotPath()), + req.groups(), false, true, false); + + workingFut.whenComplete((res, err) -> { + if (err != null) + phaseFut.onDone(err); + else + phaseFut.onDone(new SnapshotCheckResponse(res)); + }); + } + + return phaseFut; + } + + /** */ + private Map<ClusterNode, Exception> mapErrors(@Nullable Map<UUID, Throwable> errors) { + if (F.isEmpty(errors)) + return Collections.emptyMap(); + + return errors.entrySet().stream() + .collect(Collectors.toMap(e -> kctx.cluster().get().node(e.getKey()), + e -> e.getValue() instanceof Exception ? (Exception)e.getValue() : new IgniteException(e.getValue()))); + } + + /** */ + private Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>> mapPartsHashes( + @Nullable Map<UUID, SnapshotCheckResponse> results, Review Comment: It can't be nullable too ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java: ########## @@ -218,73 +221,109 @@ public <T> T readFromFile(File smf) } } - /** */ - public List<SnapshotMetadata> checkLocalMetas(File snpFullPath, @Nullable Collection<Integer> cacheGrpIds, - @Nullable Object locNodeConsistId) { - List<SnapshotMetadata> snpMetas = readSnapshotMetadatas(snpFullPath, locNodeConsistId); + /** Launches local metas checking and waits for the result, handles execution exceptions. */ + public List<SnapshotMetadata> checkLocalMetasResult(File snpPath, @Nullable Collection<Integer> grpIds, @Nullable Object locNodeCstId) { Review Comment: Also use a timeout for the `get()`. ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java: ########## @@ -536,19 +628,138 @@ private boolean hasExpiringEntries( return rootIO.getCount(pageAddr) != 0; } + /** Launches local partitions checking and waits for the result, handles execution exceptions. */ + public Map<PartitionKeyV2, PartitionHashRecordV2> checkPartitionsResult( + SnapshotMetadata meta, + File snpDir, + @Nullable Collection<String> groups, + boolean forCreation, + boolean checkParts, + boolean skipPartsHashes + ) { + try { + return checkPartitions(meta, snpDir, groups, forCreation, checkParts, skipPartsHashes).get(); Review Comment: Same, let's invoker calls `get()` with timeout ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java: ########## @@ -379,6 +379,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter /** Take snapshot operation procedure. */ private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> startSnpProc; + /** Snapshot full validation distributed process. */ + final SnapshotCheckProcess checkSnpProc; Review Comment: private? ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.management.cache.IdleVerifyResultV2; +import org.apache.ignite.internal.management.cache.PartitionKeyV2; +import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.SNAPSHOT_CHECK_METAS; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.SNAPSHOT_VALIDATE_PARTS; + +/** Distributed process of snapshot checking (with the partition hashes). */ +public class SnapshotCheckProcess { + /** */ + private final IgniteLogger log; + + /** */ + private final GridKernalContext kctx; + + /** Operation contexts by name. */ + private final Map<String, SnapshotCheckContext> contexts = new ConcurrentHashMap<>(); + + /** Cluster-wide operation futures per snapshot called from current node. */ + private final Map<UUID, GridFutureAdapter<SnapshotPartitionsVerifyTaskResult>> clusterOpFuts = new ConcurrentHashMap<>(); + + /** Check metas first phase subprocess. */ + private final DistributedProcess<SnapshotCheckProcessRequest, SnapshotCheckResponse> phase1CheckMetas; + + /** Partition hashes second phase subprocess. */ + private final DistributedProcess<SnapshotCheckProcessRequest, SnapshotCheckResponse> phase2PartsHashes; + + /** */ + public SnapshotCheckProcess(GridKernalContext kctx) { + this.kctx = kctx; + + log = kctx.log(getClass()); + + phase1CheckMetas = new DistributedProcess<>(kctx, SNAPSHOT_CHECK_METAS, this::prepareAndCheckMetas, + this::reducePreparationAndMetasCheck); + + phase2PartsHashes = new DistributedProcess<>(kctx, SNAPSHOT_VALIDATE_PARTS, this::validateParts, + this::reduceValidatePartsAndFinish); + + kctx.event().addLocalEventListener(evt -> onNodeLeft(((DiscoveryEvent)evt).eventNode().id()), EVT_NODE_FAILED, EVT_NODE_LEFT); + } + + /** Expected to run in a discovery-managed thread. */ + private void onNodeLeft(UUID nodeId) { + Throwable err = new ClusterTopologyCheckedException("Snapshot validation stopped. A required node left the cluster " + + "[nodeId=" + nodeId + ']'); + + contexts.values().forEach(ctx -> { + if (ctx.req.nodes().contains(nodeId)) + ctx.locProcFut.onDone(err); + }); + } + + /** */ + Map<String, SnapshotCheckContext> contexts() { + return Collections.unmodifiableMap(contexts); + } + + /** + * Stops all the processes with the passed exception. + * + * @param err The interrupt reason. + */ + void interrupt(Throwable err) { + contexts.forEach((snpName, ctx) -> ctx.locProcFut.onDone(err)); + + contexts.clear(); + + clusterOpFuts.forEach((reqId, fut) -> fut.onDone(err)); + } + + /** Phase 2 and process finish. */ + private IgniteInternalFuture<?> reduceValidatePartsAndFinish( + UUID reqId, + Map<UUID, SnapshotCheckResponse> results, + Map<UUID, Throwable> errors + ) { + SnapshotCheckContext ctx = context(null, reqId); + + if (ctx == null) + return new GridFinishedFuture<>(); + + contexts.remove(ctx.req.snapshotName()); + + if (log.isInfoEnabled()) + log.info("Finished snapshot validation [req=" + ctx.req + ']'); + + GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> clusterOpFut = clusterOpFuts.get(reqId); + + if (clusterOpFut == null) + return new GridFinishedFuture<>(); + + assert results.values().stream().noneMatch(res -> res != null && res.metas != null); + + if (ctx.req.allRestoreHandlers) { + try { + if (!F.isEmpty(errors)) + throw F.firstValue(errors); + + Map<ClusterNode, Map<String, SnapshotHandlerResult<?>>> cstRes = mapCustomHandlersResults(results, ctx.req.nodes()); + + kctx.cache().context().snapshotMgr().checker().checkCustomHandlersResults(ctx.req.snapshotName(), cstRes); + + clusterOpFut.onDone(new SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, null)); + } + catch (Throwable err) { + clusterOpFut.onDone(err); + } + } + else { + Map<ClusterNode, Exception> errors0 = mapErrors(errors); + + if (!F.isEmpty(results)) { + Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>> results0 = mapPartsHashes(results, ctx.req.nodes()); + + IdleVerifyResultV2 chkRes = SnapshotChecker.reduceHashesResults(results0, errors0); + + clusterOpFut.onDone(new SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, chkRes)); + } + else { + assert !errors0.isEmpty(); Review Comment: Useless assert, `IgniteSnapshotVerifyException` doesn't require non empty collection. ########## modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java: ########## @@ -488,6 +488,16 @@ public enum DistributedProcessType { /** * Incremental snapshot restore start phase. */ - RESTORE_INCREMENTAL_SNAPSHOT_START + RESTORE_INCREMENTAL_SNAPSHOT_START, + + /** + * Snapshot metadatas check. + */ + CHECK_SNAPSHOT_METAS, + + /** + * Snapshot partitions validation. + */ + VALIDATE_SNAPSHOT_PARTS_PARTS Review Comment: let's unify name of related processes - either check or validate. ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.management.cache.IdleVerifyResultV2; +import org.apache.ignite.internal.management.cache.PartitionKeyV2; +import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.SNAPSHOT_CHECK_METAS; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.SNAPSHOT_VALIDATE_PARTS; + +/** Distributed process of snapshot checking (with the partition hashes). */ +public class SnapshotCheckProcess { + /** */ + private final IgniteLogger log; + + /** */ + private final GridKernalContext kctx; + + /** Operation contexts by name. */ + private final Map<String, SnapshotCheckContext> contexts = new ConcurrentHashMap<>(); + + /** Cluster-wide operation futures per snapshot called from current node. */ + private final Map<UUID, GridFutureAdapter<SnapshotPartitionsVerifyTaskResult>> clusterOpFuts = new ConcurrentHashMap<>(); + + /** Check metas first phase subprocess. */ + private final DistributedProcess<SnapshotCheckProcessRequest, SnapshotCheckResponse> phase1CheckMetas; + + /** Partition hashes second phase subprocess. */ + private final DistributedProcess<SnapshotCheckProcessRequest, SnapshotCheckResponse> phase2PartsHashes; + + /** */ + public SnapshotCheckProcess(GridKernalContext kctx) { + this.kctx = kctx; + + log = kctx.log(getClass()); + + phase1CheckMetas = new DistributedProcess<>(kctx, SNAPSHOT_CHECK_METAS, this::prepareAndCheckMetas, + this::reducePreparationAndMetasCheck); + + phase2PartsHashes = new DistributedProcess<>(kctx, SNAPSHOT_VALIDATE_PARTS, this::validateParts, + this::reduceValidatePartsAndFinish); + + kctx.event().addLocalEventListener(evt -> onNodeLeft(((DiscoveryEvent)evt).eventNode().id()), EVT_NODE_FAILED, EVT_NODE_LEFT); + } + + /** Expected to run in a discovery-managed thread. */ + private void onNodeLeft(UUID nodeId) { + Throwable err = new ClusterTopologyCheckedException("Snapshot validation stopped. A required node left the cluster " + + "[nodeId=" + nodeId + ']'); + + contexts.values().forEach(ctx -> { + if (ctx.req.nodes().contains(nodeId)) + ctx.locProcFut.onDone(err); + }); + } + + /** */ + Map<String, SnapshotCheckContext> contexts() { + return Collections.unmodifiableMap(contexts); + } + + /** + * Stops all the processes with the passed exception. + * + * @param err The interrupt reason. + */ + void interrupt(Throwable err) { + contexts.forEach((snpName, ctx) -> ctx.locProcFut.onDone(err)); + + contexts.clear(); + + clusterOpFuts.forEach((reqId, fut) -> fut.onDone(err)); + } + + /** Phase 2 and process finish. */ + private IgniteInternalFuture<?> reduceValidatePartsAndFinish( + UUID reqId, + Map<UUID, SnapshotCheckResponse> results, + Map<UUID, Throwable> errors + ) { + SnapshotCheckContext ctx = context(null, reqId); + + if (ctx == null) + return new GridFinishedFuture<>(); + + contexts.remove(ctx.req.snapshotName()); + + if (log.isInfoEnabled()) + log.info("Finished snapshot validation [req=" + ctx.req + ']'); + + GridFutureAdapter<SnapshotPartitionsVerifyTaskResult> clusterOpFut = clusterOpFuts.get(reqId); + + if (clusterOpFut == null) + return new GridFinishedFuture<>(); + + assert results.values().stream().noneMatch(res -> res != null && res.metas != null); + + if (ctx.req.allRestoreHandlers) { + try { + if (!F.isEmpty(errors)) + throw F.firstValue(errors); + + Map<ClusterNode, Map<String, SnapshotHandlerResult<?>>> cstRes = mapCustomHandlersResults(results, ctx.req.nodes()); + + kctx.cache().context().snapshotMgr().checker().checkCustomHandlersResults(ctx.req.snapshotName(), cstRes); + + clusterOpFut.onDone(new SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, null)); + } + catch (Throwable err) { + clusterOpFut.onDone(err); + } + } + else { + Map<ClusterNode, Exception> errors0 = mapErrors(errors); + + if (!F.isEmpty(results)) { + Map<ClusterNode, Map<PartitionKeyV2, PartitionHashRecordV2>> results0 = mapPartsHashes(results, ctx.req.nodes()); + + IdleVerifyResultV2 chkRes = SnapshotChecker.reduceHashesResults(results0, errors0); + + clusterOpFut.onDone(new SnapshotPartitionsVerifyTaskResult(ctx.clusterMetas, chkRes)); + } + else { + assert !errors0.isEmpty(); + + clusterOpFut.onDone(new IgniteSnapshotVerifyException(errors0)); + } + } + + return new GridFinishedFuture<>(); + } + + /** Phase 2 beginning. */ + private IgniteInternalFuture<SnapshotCheckResponse> validateParts(SnapshotCheckProcessRequest req) { + if (!req.nodes().contains(kctx.localNodeId())) + return new GridFinishedFuture<>(); + + SnapshotCheckContext ctx = context(req.snapshotName(), req.requestId()); + + assert ctx != null; + + if (ctx.locMeta == null) + return new GridFinishedFuture<>(); + + IgniteSnapshotManager snpMgr = kctx.cache().context().snapshotMgr(); + + GridFutureAdapter<SnapshotCheckResponse> phaseFut = ctx.phaseFut(); + + // Might be already finished by #onNodeLeft(). + if (!phaseFut.isDone()) { + CompletableFuture<? extends Map<?, ?>> workingFut = req.allRestoreHandlers + ? snpMgr.checker().invokeCustomHandlers(ctx.locMeta, req.snapshotPath(), req.groups(), true) + : snpMgr.checker().checkPartitions(ctx.locMeta, snpMgr.snapshotLocalDir(req.snapshotName(), req.snapshotPath()), + req.groups(), false, true, false); + + workingFut.whenComplete((res, err) -> { + if (err != null) + phaseFut.onDone(err); + else + phaseFut.onDone(new SnapshotCheckResponse(res)); + }); + } + + return phaseFut; + } + + /** */ + private Map<ClusterNode, Exception> mapErrors(@Nullable Map<UUID, Throwable> errors) { Review Comment: DistributedProcess never provides null collections. See `DistirbutedProcess#finishProcess` ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java: ########## @@ -403,7 +495,7 @@ public Map<PartitionKeyV2, PartitionHashRecordV2> checkSnapshotFiles( ) { pageStore.init(); - if (punchHoleEnabled && meta.isGroupWithCompression(grpId) && forCreation) { + if (pouchHoleEnabled && meta.isGroupWithCompression(grpId) && forCreation) { Review Comment: misprint ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.management.cache.IdleVerifyResultV2; +import org.apache.ignite.internal.management.cache.PartitionKeyV2; +import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.SNAPSHOT_CHECK_METAS; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.SNAPSHOT_VALIDATE_PARTS; + +/** Distributed process of snapshot checking (with the partition hashes). */ +public class SnapshotCheckProcess { + /** */ + private final IgniteLogger log; + + /** */ + private final GridKernalContext kctx; + + /** Operation contexts by name. */ + private final Map<String, SnapshotCheckContext> contexts = new ConcurrentHashMap<>(); + + /** Cluster-wide operation futures per snapshot called from current node. */ + private final Map<UUID, GridFutureAdapter<SnapshotPartitionsVerifyTaskResult>> clusterOpFuts = new ConcurrentHashMap<>(); + + /** Check metas first phase subprocess. */ + private final DistributedProcess<SnapshotCheckProcessRequest, SnapshotCheckResponse> phase1CheckMetas; + + /** Partition hashes second phase subprocess. */ + private final DistributedProcess<SnapshotCheckProcessRequest, SnapshotCheckResponse> phase2PartsHashes; + + /** */ + public SnapshotCheckProcess(GridKernalContext kctx) { + this.kctx = kctx; + + log = kctx.log(getClass()); + + phase1CheckMetas = new DistributedProcess<>(kctx, SNAPSHOT_CHECK_METAS, this::prepareAndCheckMetas, + this::reducePreparationAndMetasCheck); + + phase2PartsHashes = new DistributedProcess<>(kctx, SNAPSHOT_VALIDATE_PARTS, this::validateParts, + this::reduceValidatePartsAndFinish); + + kctx.event().addLocalEventListener(evt -> onNodeLeft(((DiscoveryEvent)evt).eventNode().id()), EVT_NODE_FAILED, EVT_NODE_LEFT); + } + + /** Expected to run in a discovery-managed thread. */ + private void onNodeLeft(UUID nodeId) { Review Comment: Let's inline this method. ########## modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java: ########## @@ -599,6 +617,576 @@ public void testClusterSnapshotCheckMultipleTimes() throws Exception { assertTrue("Threads created: " + createdThreads, createdThreads < iterations); } + /** Tests that concurrent snapshot full checks are declined for the same snapshot. */ + @Test + public void testConcurrentTheSameSnpFullChecksDeclined() throws Exception { + // 0 - coordinator; 0,1,2 - baselines; 3 - non-baseline; 4,5 - clients. + prepareGridsAndSnapshot(4, 3, 2, false); + + for (int i = 0; i < G.allGrids().size(); ++i) { + for (int j = 1; j < G.allGrids().size() - 1; ++j) { + int i0 = i; + int j0 = j; + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null)), + () -> new IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME, null)), + CHECK_SNAPSHOT_METAS, + VALIDATE_SNAPSHOT_PARTS_PARTS, + true, + false, + null, + null + ); + } + } + } + + /** Tests that concurrent snapshot full checks are allowed for different snapshots. */ + @Test + public void testConcurrentDifferentSnpFullChecksAllowed() throws Exception { + // 0 - coordinator; 0,1 - baselines; 2 - non-baseline; 3,4 - clients. + prepareGridsAndSnapshot(3, 2, 2, false); + + snp(grid(3)).createSnapshot(SNAPSHOT_NAME + '2').get(); + + for (int i = 0; i < G.allGrids().size(); ++i) { + for (int j = 1; j < G.allGrids().size() - 1; ++j) { + int i0 = i; + int j0 = j; + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null)), + () -> new IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME + '2', null)), + CHECK_SNAPSHOT_METAS, + VALIDATE_SNAPSHOT_PARTS_PARTS, + false, + true, + null, + null + ); + } + } + } + + /** Tests that concurrent snapshot full check and restoration (without checking) are allowed for different snapshots. */ + @Test + public void testConcurrentDifferentSnpFullCheckAndRestorationAllowed() throws Exception { + prepareGridsAndSnapshot(3, 2, 2, false); + + snp(grid(3)).createSnapshot(SNAPSHOT_NAME + '2').get(); + + grid(0).destroyCache(DEFAULT_CACHE_NAME); + + awaitPartitionMapExchange(); + + for (int i = 0; i < G.allGrids().size(); ++i) { + // Snapshot restoration is disallowed from client nodes. + for (int j = 1; j < 3; ++j) { + int i0 = i; + int j0 = j; + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null)), + () -> snp(grid(j0)).restoreSnapshot(SNAPSHOT_NAME + '2', null), + VALIDATE_SNAPSHOT_PARTS_PARTS, + RESTORE_CACHE_GROUP_SNAPSHOT_START, + false, + false, + null, + () -> grid(0).destroyCache(DEFAULT_CACHE_NAME) + ); + } + } + } + + /** Tests concurrent snapshot full check and full restoration (with checking) are allowed for different snapshots. */ + @Test + public void testConcurrentDifferentSnpCheckAndFullRestorationAllowed() throws Exception { + prepareGridsAndSnapshot(3, 2, 2, false); + + snp(grid(0)).createSnapshot(SNAPSHOT_NAME + '2').get(); + + grid(0).destroyCache(DEFAULT_CACHE_NAME); + + awaitPartitionMapExchange(); + + for (int i = 0; i < G.allGrids().size(); ++i) { + // Snapshot restoration is disallowed from client nodes. + for (int j = 1; j < 3; ++j) { + int i0 = i; + int j0 = j; + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null)), + () -> snp(grid(j0)).restoreSnapshot(SNAPSHOT_NAME + '2', null, null, 0, true), + CHECK_SNAPSHOT_METAS, + VALIDATE_SNAPSHOT_PARTS_PARTS, + false, + true, + null, + () -> grid(0).destroyCache(DEFAULT_CACHE_NAME) + ); + } + } + } + + /** Tests that concurrent snapshot full restoration (with checking) is declined when the same snapshot is being fully checked. */ + @Test + public void testConcurrentFullCheckAndFullRestoreDeclined() throws Exception { + prepareGridsAndSnapshot(3, 2, 2, false); + + for (int i = 0; i < G.allGrids().size(); ++i) { + // Snapshot restoration is disallowed from client nodes. + for (int j = 1; j < 3; ++j) { + int i0 = i; + int j0 = j; + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null)), + () -> snp(grid(j0)).restoreSnapshot(SNAPSHOT_NAME, null, null, 0, true), + CHECK_SNAPSHOT_METAS, + VALIDATE_SNAPSHOT_PARTS_PARTS, + true, + false, + null, + () -> grid(0).destroyCache(DEFAULT_CACHE_NAME) + ); + } + } + } + + /** Tests that concurrent snapshot full check is declined when the same snapshot is being fully restored (checked). */ + @Test + public void testConcurrentTheSameSnpFullCheckWhenFullyRestoringDeclined() throws Exception { + prepareGridsAndSnapshot(3, 2, 2, true); + + // Snapshot restoration is disallowed from client nodes. + for (int i = 0; i < 3; ++i) { + for (int j = 1; j < G.allGrids().size(); ++j) { + int i0 = i; + int j0 = j; + + doTestConcurrentSnpCheckOperations( + () -> snp(grid(i0)).restoreSnapshot(SNAPSHOT_NAME, null, null, 0, true), + () -> new IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME, null)), + CHECK_SNAPSHOT_METAS, + VALIDATE_SNAPSHOT_PARTS_PARTS, + true, + false, + null, + () -> grid(0).destroyCache(DEFAULT_CACHE_NAME) + ); + } + } + } + + /** Tests that concurrent full check and restoration (without checking) of the same snapshot are allowed. */ + @Test + public void testConcurrentTheSameSnpFullCheckAndRestoreAllowed() throws Exception { + prepareGridsAndSnapshot(3, 2, 2, true); + + for (int i = 0; i < G.allGrids().size(); ++i) { + // Snapshot restoration is disallowed from client nodes. + for (int j = 1; j < 3; ++j) { + int i0 = i; + int j0 = j; + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null)), + () -> snp(grid(j0)).restoreSnapshot(SNAPSHOT_NAME, null), + CHECK_SNAPSHOT_METAS, + RESTORE_CACHE_GROUP_SNAPSHOT_START, + false, + false, + null, + () -> grid(0).destroyCache(DEFAULT_CACHE_NAME) + ); + } + } + } + + /** Tests that snapshot full check doesn't affect a snapshot creation. */ + @Test + public void testConcurrentSnpCheckAndCreateAllowed() throws Exception { + prepareGridsAndSnapshot(3, 2, 2, false); + + for (int i = 0; i < G.allGrids().size(); ++i) { + for (int j = 1; j < G.allGrids().size() - 1; ++j) { + int i0 = i; + int j0 = j; + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null)), + () -> snp(grid(j0)).createSnapshot(SNAPSHOT_NAME + "_2", null, false, false), + CHECK_SNAPSHOT_METAS, + null, + false, + false, + () -> U.delete(snp(grid(0)).snapshotLocalDir(SNAPSHOT_NAME + "_2")), + () -> U.delete(snp(grid(0)).snapshotLocalDir(SNAPSHOT_NAME + "_2")) + ); + } + } + } + + /** Tests snapshot checking processes a baseline node leave. */ + @Test + public void testBaselineLeavesDuringSnapshotChecking() throws Exception { + prepareGridsAndSnapshot(6, 5, 1, false); + + Set<Integer> stopped = new HashSet<>(); + + // Snapshot checking started from the coordinator. + doTestNodeStopsDuringSnapshotChecking(0, 4, stopped); + + // Snapshot checking started from non-baseline. + doTestNodeStopsDuringSnapshotChecking(5, 3, stopped); + + // Snapshot checking started from a client. + doTestNodeStopsDuringSnapshotChecking(5, 2, stopped); + + // The same baseline leaves. + doTestNodeStopsDuringSnapshotChecking(1, 1, stopped); + } + + /** Tests snapshot checking processes a client node leave. */ + @Test + public void testClientLeavesDuringSnapshotChecking() throws Exception { + prepareGridsAndSnapshot(3, 2, 6, false); + + Set<Integer> stopped = new HashSet<>(); + + // Snapshot checking started from a baseline. + doTestNodeStopsDuringSnapshotChecking(1, 8, stopped); + + // Snapshot checking started from a non-baseline. + doTestNodeStopsDuringSnapshotChecking(2, 7, stopped); + + // Snapshot checking started from the coordinator. + doTestNodeStopsDuringSnapshotChecking(0, 6, stopped); + + // Snapshot checking started from other client. + doTestNodeStopsDuringSnapshotChecking(4, 5, stopped); + + // Snapshot checking started from the same client. + doTestNodeStopsDuringSnapshotChecking(4, 4, stopped); + } + + /** Tests snapshot checking processes a non-baseline node leave. */ + @Test + public void testNonBaselineServerLeavesDuringSnapshotChecking() throws Exception { + prepareGridsAndSnapshot(7, 2, 1, false); + + Set<Integer> stopped = new HashSet<>(); + + // Snapshot checking started from a sever node. + doTestNodeStopsDuringSnapshotChecking(1, 6, stopped); + + // Snapshot checking started from a client node. + doTestNodeStopsDuringSnapshotChecking(7, 5, stopped); + + // Snapshot checking started from another non-baseline. + doTestNodeStopsDuringSnapshotChecking(3, 4, stopped); + + // Snapshot checking started from coordinator. + doTestNodeStopsDuringSnapshotChecking(0, 3, stopped); + + // Snapshot checking started from the same non-baseline. + doTestNodeStopsDuringSnapshotChecking(2, 2, stopped); + } + + /** Tests snapshot checking process continues when a new baseline node leaves. */ + @Test + public void testNewBaselineServerLeavesDuringSnapshotChecking() throws Exception { + prepareGridsAndSnapshot(3, 2, 1, false); + + int grids = G.allGrids().size(); + + discoSpi(grid(0)).block(msg -> msg instanceof FullMessage && ((FullMessage<?>)msg).type() == CHECK_SNAPSHOT_METAS.ordinal()); + + IgniteInternalFuture<?> fut = snp(grid(3)).checkSnapshot(SNAPSHOT_NAME, null, null, false, 0, true); + + discoSpi(grid(0)).waitBlocked(getTestTimeout()); + + grid(0).cluster().setBaselineTopology(Stream.of(grid(0).localNode(), grid(1).localNode(), grid(2).localNode()) + .collect(Collectors.toList())); + + stopGrid(2); + + assertTrue(waitForCondition(() -> { + for (int i = 0; i < grids; ++i) { + if (i != 2 && grid(i).cluster().nodes().size() != grids - 1) + return false; + } + + return true; + }, getTestTimeout())); + + discoSpi(grid(0)).unblock(); + + fut.get(getTestTimeout()); + } + + /** Tests snapshot checking process stops when the coorditator leaves. */ + @Test + public void testCoordinatorLeavesDuringSnapshotChecking() throws Exception { + prepareGridsAndSnapshot(5, 4, 1, false); + + Set<Integer> stopped = new HashSet<>(); + + // Coordinator leaves when snapshot started from a server node. + doTestNodeStopsDuringSnapshotChecking(4, 0, stopped); + + // Coordinator leaves when snapshot started from a client node. + assertTrue(U.isLocalNodeCoordinator(grid(1).context().discovery())); + + doTestNodeStopsDuringSnapshotChecking(5, 1, stopped); + + // Coordinator leaves when snapshot started from it. + assertTrue(U.isLocalNodeCoordinator(grid(2).context().discovery())); + + doTestNodeStopsDuringSnapshotChecking(2, 2, stopped); + } + + /** */ + private void prepareGridsAndSnapshot(int servers, int baseLineCnt, int clients, boolean removeTheCache) throws Exception { + assert baseLineCnt > 0 && baseLineCnt <= servers; + + IgniteEx ignite = null; + + for (int i = 0; i < servers + clients; ++i) { + IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(i)); + + cfg.setDiscoverySpi(new BlockingCustomMessageDiscoverySpi()); + + if (i >= servers) + cfg.setClientMode(true); + + ignite = startGrid(cfg); + + if (i == baseLineCnt - 1) { + ignite.cluster().state(ACTIVE); + + ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion()); + } + } + + try (IgniteDataStreamer<Integer, Integer> ds = grid(0).dataStreamer(DEFAULT_CACHE_NAME)) { + for (int i = 0; i < 100; ++i) + ds.addData(i, i); + } + + ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(); + + if (removeTheCache) + ignite.destroyCache(DEFAULT_CACHE_NAME); + } + + /** + * Tests concurrent snapshot operations related to the snapshot checking. + * + * @param originatorOp First snapshot operation on an originator node. + * @param trierOp Second concurrent snapshot operation on a trier node. + * @param firstDelay First distributed process full message of {@code originatorOp} to delay on the coordinator + * to launch {@code trierOp}. + * @param secondDelay Second distributed process full message of {@code originatorOp} to delay on the coordinator + * to launch {@code trierOp} again. + * @param expectFailure If {@code true}, the 'snapshot-check-is-in-progress' error is excepted during excution of + * {@code trierOp}. Otherwise, {@code trierOp} must successfully finish. + * @param waitForBothFirstDelays If {@code true}, {@code firstDelay} are awaited for both concurrend operations before proceed. + * @param step2preparation If not {@code null}, is executed before delaying and waiting for {@code secondDelay}. + * @param cleaner If not {@code null}, is executed at the end. + */ + private void doTestConcurrentSnpCheckOperations( + Supplier<IgniteFuture<?>> originatorOp, + Supplier<IgniteFuture<?>> trierOp, + DistributedProcess.DistributedProcessType firstDelay, + @Nullable DistributedProcess.DistributedProcessType secondDelay, + boolean expectFailure, + boolean waitForBothFirstDelays, + @Nullable Runnable step2preparation, + @Nullable Runnable cleaner + ) throws Exception { + try { + AtomicBoolean firstDelayed = new AtomicBoolean(); + + // Block any matching if the operation is the same. Otherwise, block only firts. + discoSpi(grid(0)).block(msg -> msg instanceof FullMessage && ((FullMessage<?>)msg).type() == firstDelay.ordinal() + && (waitForBothFirstDelays || firstDelayed.compareAndSet(false, true))); + + IgniteFuture<?> fut = originatorOp.get(); + + discoSpi(grid(0)).waitBlocked(getTestTimeout()); + + IgniteFuture<?> fut2 = trierOp.get(); + + if (expectFailure) { + assertThrowsAnyCause( + log, + fut2::get, + IllegalStateException.class, + "Validation of snapshot '" + SNAPSHOT_NAME + "' has already started" + ); + + if (secondDelay == null) { + discoSpi(grid(0)).unblock(); + + fut.get(getTestTimeout()); + + return; + } + + discoSpi(grid(0)).blockNextAndRelease(msg -> msg instanceof FullMessage + && ((FullMessage<?>)msg).type() == secondDelay.ordinal()); + + discoSpi(grid(0)).waitBlocked(getTestTimeout()); + + if (step2preparation != null) + step2preparation.run(); + + assertThrowsAnyCause( + log, + fut2::get, + IllegalStateException.class, + "Validation of snapshot '" + SNAPSHOT_NAME + "' has already started" + ); + + discoSpi(grid(0)).unblock(); + + fut.get(getTestTimeout()); + } + else { + if (waitForBothFirstDelays) { + discoSpi(grid(0)).waitBlockedSize(2, getTestTimeout()); + + if (secondDelay != null) { + discoSpi(grid(0)).blockNextAndRelease(msg -> msg instanceof FullMessage + && ((FullMessage<?>)msg).type() == secondDelay.ordinal()); + + discoSpi(grid(0)).waitBlockedSize(2, getTestTimeout()); + } + } + else { + if (secondDelay != null) { + discoSpi(grid(0)).blockNextAndRelease(msg -> msg instanceof FullMessage + && ((FullMessage<?>)msg).type() == secondDelay.ordinal()); + + discoSpi(grid(0)).waitBlocked(getTestTimeout()); + } + else + fut2.get(); + } + + discoSpi(grid(0)).unblock(); + + fut2.get(); + + fut.get(); + } + } + finally { + discoSpi(grid(0)).unblock(); + + if (cleaner != null) + cleaner.run(); + + awaitPartitionMapExchange(); + } + } + + /** */ + private void doTestNodeStopsDuringSnapshotChecking(int originatorIdx, int nodeToStopIdx, Set<Integer> stopped) throws Exception { + int grids = G.allGrids().size(); + + ClusterNode leaving = grid(nodeToStopIdx).cluster().localNode(); + + boolean requredLeft = originatorIdx == nodeToStopIdx || grid(nodeToStopIdx).cluster().currentBaselineTopology().stream() + .anyMatch(bl -> bl.consistentId().equals(leaving.consistentId())); + + int coordIdx = -1; + + for (int i = 0; i < grids; ++i) { + if (stopped.contains(i) || !U.isLocalNodeCoordinator(grid(i).context().discovery())) + continue; + + coordIdx = i; + + break; + } + + AtomicInteger chkAgainIdx = new AtomicInteger(-1); + + try { + discoSpi(grid(coordIdx)).block(msg -> msg instanceof FullMessage + && ((FullMessage<?>)msg).type() == CHECK_SNAPSHOT_METAS.ordinal()); + + IgniteInternalFuture<?> fut = snp(grid(originatorIdx)).checkSnapshot(SNAPSHOT_NAME, null, null, false, 0, true); + + discoSpi(grid(coordIdx)).waitBlocked(getTestTimeout()); + + stopGrid(nodeToStopIdx); + + stopped.add(nodeToStopIdx); + + assertTrue(waitForCondition(() -> { + for (int i = 0; i < grids; ++i) { + if (!stopped.contains(i) && grid(i).cluster().nodes().size() != grids - 1) + return false; + } + + return true; + }, getTestTimeout())); + + if (nodeToStopIdx != coordIdx) + discoSpi(grid(coordIdx)).unblock(); + + if (originatorIdx == nodeToStopIdx) { + assertThrowsAnyCause( + null, + () -> fut.get(getTestTimeout()), + NodeStoppingException.class, + "Node is stopping" + ); + + return; + } + + if (requredLeft) { + assertThrowsAnyCause( + null, + () -> fut.get(getTestTimeout()), + ClusterTopologyCheckedException.class, + "Snapshot validation stopped. A required node left the cluster" + ); + } + else + fut.get(getTestTimeout()); + } + finally { + if (nodeToStopIdx != coordIdx) + discoSpi(grid(coordIdx)).unblock(); + + awaitPartitionMapExchange(); + + assertTrue(waitForCondition(() -> { + for (int i = 0; i < grids; ++i) { + if (stopped.contains(i)) + continue; + + chkAgainIdx.compareAndSet(-1, i); + + if (!snp(grid(i)).checkSnpProc.contexts().isEmpty()) Review Comment: let's skip this check and replace it later with public API (metrics for example) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
