This is an automated email from the ASF dual-hosted git repository. nizhikov pushed a commit to branch IGNITE-19950-snapshot-merge in repository https://gitbox.apache.org/repos/asf/ignite.git
commit b275a9803b9f23f8315c1c18655ad861dd3b7846 Author: nizhikov <nizhi...@apache.org> AuthorDate: Sat Jul 29 02:26:29 2023 +0300 IGNITE-19950 Stub for dump creation process implemented --- .../snapshot/AbstractCreateBackupFutureTask.java | 231 +++++++++++++++++ .../snapshot/AbstractSnapshotFutureTask.java | 42 +--- .../persistence/snapshot/DumpCacheFutureTask.java | 210 ++++++++++++++++ .../snapshot/IgniteSnapshotManager.java | 115 ++------- .../snapshot/IncrementalSnapshotFutureTask.java | 5 - .../snapshot/SnapshotFinishedFutureTask.java | 2 +- .../persistence/snapshot/SnapshotFutureTask.java | 279 +++++++-------------- .../persistence/snapshot/SnapshotMetadata.java | 13 +- .../snapshot/SnapshotResponseRemoteFutureTask.java | 7 +- .../cache/persistence/snapshot/SnapshotSender.java | 2 +- .../snapshot/dump/DumpCacheFutureTask.java | 210 ---------------- .../snapshot/AbstractSnapshotSelfTest.java | 2 +- .../snapshot/EncryptedSnapshotTest.java | 2 +- .../snapshot/IgniteSnapshotManagerSelfTest.java | 1 + 14 files changed, 569 insertions(+), 552 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateBackupFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateBackupFutureTask.java new file mode 100644 index 00000000000..0246841e7ed --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateBackupFutureTask.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener; +import org.apache.ignite.internal.util.lang.IgniteThrowableRunner; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; + +/** + * + */ +public abstract class AbstractCreateBackupFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskResult> { + /** + * Cache group and corresponding partitions collected under the PME lock. + * For full snapshot additional checkpoint write lock required. + * @see SnapshotFutureTask#onMarkCheckpointBegin(CheckpointListener.Context) + */ + final Map<Integer, Set<Integer>> processed = new HashMap<>(); + + /** Future which will be completed when task requested to be closed. Will be executed on system pool. */ + protected volatile CompletableFuture<Void> closeFut; + + /** + * @param cctx Shared context. + * @param srcNodeId Node id which cause snapshot task creation. + * @param reqId Snapshot operation request ID. + * @param snpName Unique identifier of snapshot process. + * @param snpSndr Factory which produces snapshot receiver instance. + * @param parts Partition to be processed. + */ + protected AbstractCreateBackupFutureTask( + GridCacheSharedContext<?, ?> cctx, + UUID srcNodeId, + UUID reqId, + String snpName, + SnapshotSender snpSndr, + Map<Integer, Set<Integer>> parts + ) { + super(cctx, srcNodeId, reqId, snpName, snpSndr, parts); + } + + /** */ + protected abstract List<CompletableFuture<Void>> saveMetaCopy(); + + /** */ + protected abstract List<CompletableFuture<Void>> saveCacheConfigsCopy(); + + /** */ + protected abstract List<CompletableFuture<Void>> saveGroup(int grpId, Set<Integer> grpParts) throws IgniteCheckedException; + + /** {@inheritDoc} */ + @Override public boolean cancel() { + super.cancel(); + + try { + closeAsync().get(); + } + catch (InterruptedException | ExecutionException e) { + U.error(log, "SnapshotFutureTask cancellation failed", e); + + return false; + } + + return true; + } + + /** @return Future which will be completed when operations truly stopped. */ + protected abstract CompletableFuture<Void> closeAsync(); + + /** + * @return {@code true} if current task requested to be stopped. + */ + boolean stopping() { + return err.get() != null; + } + + /** */ + void processPartitions() throws IgniteCheckedException { + for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) { + int grpId = e.getKey(); + Set<Integer> grpParts = e.getValue(); + + CacheGroupContext gctx = cctx.cache().cacheGroup(grpId); + + Iterator<GridDhtLocalPartition> iter; + + if (grpParts == null) + iter = gctx.topology().currentLocalPartitions().iterator(); + else { + if (grpParts.contains(INDEX_PARTITION)) { + throw new IgniteCheckedException("Index partition cannot be included into snapshot if " + + " set of cache group partitions has been explicitly provided [grpId=" + grpId + ']'); + } + + iter = F.iterator(grpParts, gctx.topology()::localPartition, false); + } + + Set<Integer> owning = new HashSet<>(); + Set<Integer> missed = new HashSet<>(); + + // Iterate over partitions in particular cache group. + while (iter.hasNext()) { + GridDhtLocalPartition part = iter.next(); + + // Partition can be in MOVING\RENTING states. + // Index partition will be excluded if not all partition OWNING. + // There is no data assigned to partition, thus it haven't been created yet. + if (part.state() == GridDhtPartitionState.OWNING) + owning.add(part.id()); + else + missed.add(part.id()); + } + + boolean affNode = gctx.nodeFilter() == null || gctx.nodeFilter().apply(cctx.localNode()); + + if (grpParts != null) { + // Partition has been provided for cache group, but some of them are not in OWNING state. + // Exit with an error. + if (!missed.isEmpty()) { + throw new IgniteCheckedException("Snapshot operation cancelled due to " + + "not all of requested partitions has OWNING state on local node [grpId=" + grpId + + ", missed=" + S.toStringSortedDistinct(missed) + ']'); + } + } + else { + // Partitions have not been provided for snapshot task and all partitions have + // OWNING state, so index partition must be included into snapshot. + if (!missed.isEmpty()) { + log.warning("All local cache group partitions in OWNING state have been included into a snapshot. " + + "Partitions which have different states skipped. Index partitions has also been skipped " + + "[snpName=" + snpName + ", grpId=" + grpId + ", missed=" + S.toStringSortedDistinct(missed) + ']'); + } + else if (affNode && missed.isEmpty() && cctx.kernalContext().query().moduleEnabled()) + owning.add(INDEX_PARTITION); + } + + processed.put(grpId, owning); + } + } + + /** */ + protected void backupAllAsync() { + try { + // Submit all tasks for partitions and deltas processing. + List<CompletableFuture<Void>> futs = new ArrayList<>(); + + futs.addAll(saveMetaCopy()); + futs.addAll(saveCacheConfigsCopy()); + + for (Map.Entry<Integer, Set<Integer>> grpParts : processed.entrySet()) + futs.addAll(saveGroup(grpParts.getKey(), grpParts.getValue())); + + int futsSize = futs.size(); + + CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize])).whenComplete((res, t) -> { + assert t == null : "Exception must never be thrown since a wrapper is used " + + "for each snapshot task: " + t; + + closeAsync(); + }); + } + catch (IgniteCheckedException e) { + acceptException(e); + } + } + + /** {@inheritDoc} */ + @Override public void acceptException(Throwable th) { + if (th == null) + return; + + if (!(th instanceof IgniteFutureCancelledCheckedException)) + U.error(log, "Snapshot task has accepted exception to stop", th); + + if (err.compareAndSet(null, th)) + closeAsync(); + } + + /** + * @param exec Runnable task to execute. + * @return Wrapped task. + */ + Runnable wrapExceptionIfStarted(IgniteThrowableRunner exec) { + return () -> { + if (stopping()) + return; + + try { + exec.run(); + } + catch (Throwable t) { + acceptException(t); + } + }; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java index 894c2c8b70b..7ad8be039ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; -import java.io.File; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -25,16 +24,14 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.IgniteThrowableRunner; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; /** * @param <T> Type of snapshot processing result. */ -public abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> { +abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> { /** Shared context. */ protected final GridCacheSharedContext<?, ?> cctx; @@ -50,12 +47,6 @@ public abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> /** Unique identifier of snapshot process. */ protected final String snpName; - /** Snapshot working directory on file system. */ - protected final File tmpSnpWorkDir; - - /** IO factory which will be used for creating snapshot delta-writers. */ - protected final FileIOFactory ioFactory; - /** Snapshot data sender. */ @GridToStringExclude protected final SnapshotSender snpSndr; @@ -71,8 +62,6 @@ public abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> * @param srcNodeId Node id which cause snapshot task creation. * @param reqId Snapshot operation request ID. * @param snpName Unique identifier of snapshot process. - * @param tmpWorkDir Working directory for intermediate snapshot results. - * @param ioFactory Factory to working with snapshot files. * @param snpSndr Factory which produces snapshot receiver instance. * @param parts Partition to be processed. */ @@ -81,8 +70,6 @@ public abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> UUID srcNodeId, UUID reqId, String snpName, - File tmpWorkDir, - FileIOFactory ioFactory, SnapshotSender snpSndr, Map<Integer, Set<Integer>> parts ) { @@ -95,8 +82,6 @@ public abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> this.srcNodeId = srcNodeId; this.reqId = reqId; this.snpName = snpName; - this.tmpSnpWorkDir = new File(tmpWorkDir, snpName); - this.ioFactory = ioFactory; this.snpSndr = snpSndr; this.parts = parts; } @@ -129,31 +114,6 @@ public abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> return parts.keySet(); } - /** - * @param exec Runnable task to execute. - * @return Wrapped task. - */ - protected Runnable wrapExceptionIfStarted(IgniteThrowableRunner exec) { - return () -> { - if (stopping()) - return; - - try { - exec.run(); - } - catch (Throwable t) { - acceptException(t); - } - }; - } - - /** - * @return {@code true} if current task requested to be stopped. - */ - protected boolean stopping() { - return err.get() != null; - } - /** * Initiates snapshot task. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DumpCacheFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DumpCacheFutureTask.java new file mode 100644 index 00000000000..4b63306c457 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DumpCacheFutureTask.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.binary.BinaryType; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.MarshallerContextImpl; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.StoredCacheData; +import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; +import org.apache.ignite.internal.processors.marshaller.MappedName; +import org.apache.ignite.internal.util.IgniteUtils; + +import static org.apache.ignite.internal.processors.cache.GridLocalConfigManager.cachDataFilename; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX; +import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DUMP_LOCK; + +/** */ +public class DumpCacheFutureTask extends AbstractCreateBackupFutureTask { + /** */ + private final File dumpDir; + + /** + * @param cctx Cache context. + * @param dumpName Dump name. + * @param srcNodeId Node id which cause snapshot task creation. + * @param reqId Snapshot operation request ID. + */ + public DumpCacheFutureTask( + GridCacheSharedContext<?, ?> cctx, + UUID srcNodeId, + UUID reqId, + String dumpName, + File dumpDir, + SnapshotSender snpSndr, + Map<Integer, Set<Integer>> parts + ) { + super( + cctx, + srcNodeId, + reqId, + dumpName, + snpSndr, + parts + ); + + this.dumpDir = dumpDir; + } + + /** {@inheritDoc} */ + @Override public boolean start() { + try { + log.info("Start cache dump [name=" + snpName + ", grps=" + parts.keySet() + ']'); + + File dumpNodeDir = IgniteSnapshotManager.nodeDumpDirectory(dumpDir, cctx); + + createDumpLock(dumpNodeDir); + + processPartitions(); + + backupAllAsync(); + } + catch (IgniteCheckedException | IOException e) { + acceptException(e); + + onDone(e); + } + + return false; // Don't wait for checkpoint. + } + + /** */ + private void createDumpLock(File dumpNodeDir) throws IgniteCheckedException, IOException { + File lock = new File(dumpNodeDir, DUMP_LOCK); + + if (!lock.createNewFile()) + throw new IgniteCheckedException("Lock file can't be created or already exists: " + lock.getAbsolutePath()); + } + + /** {@inheritDoc} */ + @Override protected List<CompletableFuture<Void>> saveMetaCopy() { + Collection<BinaryType> types = cctx.kernalContext().cacheObjects().binary().types(); + + ArrayList<Map<Integer, MappedName>> mappings = cctx.kernalContext().marshallerContext().getCachedMappings(); + + return Arrays.asList( + CompletableFuture.runAsync( + wrapExceptionIfStarted( + () -> cctx.kernalContext().cacheObjects().saveMetadata(types, dumpDir) + ), + snpSndr.executor() + ), + + CompletableFuture.runAsync( + wrapExceptionIfStarted(() -> MarshallerContextImpl.saveMappings(cctx.kernalContext(), mappings, dumpDir)), + snpSndr.executor() + ) + ); + } + + /** {@inheritDoc} */ + @Override protected List<CompletableFuture<Void>> saveCacheConfigsCopy() { + try { + File dumpNodeDir = IgniteSnapshotManager.nodeDumpDirectory(dumpDir, cctx); + + return parts.keySet().stream().map(grp -> CompletableFuture.runAsync(wrapExceptionIfStarted(() -> { + CacheGroupContext grpCtx = cctx.cache().cacheGroup(grp); + + File grpDir = new File( + dumpNodeDir, + (grpCtx.caches().size() > 1 ? CACHE_GRP_DIR_PREFIX : CACHE_DIR_PREFIX) + grpCtx.cacheOrGroupName() + ); + + IgniteUtils.ensureDirectory(grpDir, "dump group directory", null); + + for (GridCacheContext<?, ?> cacheCtx : grpCtx.caches()) { + CacheConfiguration<?, ?> ccfg = cacheCtx.config(); + + cctx.cache().configManager().writeCacheData( + new StoredCacheData(ccfg), + new File(grpDir, cachDataFilename(ccfg)) + ); + } + }), snpSndr.executor())).collect(Collectors.toList()); + } + catch (IgniteCheckedException e) { + acceptException(e); + + return Collections.emptyList(); + } + } + + /** {@inheritDoc} */ + @Override protected List<CompletableFuture<Void>> saveGroup(int grp, Set<Integer> grpParts) throws IgniteCheckedException { + return Collections.singletonList(CompletableFuture.runAsync(wrapExceptionIfStarted(() -> { + long start = System.currentTimeMillis(); + + CacheGroupContext grpCtx = cctx.cache().cacheGroup(grp); + + log.info("Start group dump [name=" + grpCtx.cacheOrGroupName() + ", id=" + grp + ']'); + + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(5_000)); + } + catch (InterruptedException e) { + acceptException(e); + } + + long time = System.currentTimeMillis() - start; + + log.info("Finish group dump [name=" + grpCtx.cacheOrGroupName() + ", id=" + grp + ", time=" + time + ']'); + }), snpSndr.executor())); + } + + /** {@inheritDoc} */ + @Override protected CompletableFuture<Void> closeAsync() { + if (closeFut == null) { + Throwable err0 = err.get(); + + Set<GroupPartitionId> taken = new HashSet<>(); + + for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) { + int grp = e.getKey(); + + for (Integer part : e.getValue()) + taken.add(new GroupPartitionId(grp, part)); + } + + closeFut = CompletableFuture.runAsync( + () -> onDone(new SnapshotFutureTaskResult(taken, null), err0), + cctx.kernalContext().pools().getSystemExecutorService() + ); + } + + return closeFut; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 9144f5d86ab..404968c7089 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -141,7 +141,6 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.Metas import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage; import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage; import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; -import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.DumpCacheFutureTask; import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; @@ -838,7 +837,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter } /** */ - private File snapshotLocalDir(String snpName, @Nullable String snpPath, File locSnpDir) { + public static File snapshotLocalDir(String snpName, @Nullable String snpPath, File locSnpDir) { assert locSnpDir != null; assert U.alphanumericUnderscore(snpName) : snpName; @@ -975,8 +974,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter return initLocalIncrementalSnapshot(req, meta); } - else if (req.dump()) - return initLocalDump(req, grpIds); else return initLocalFullSnapshot(req, grpIds, comprGrpIds, withMetaStorage); } @@ -1051,8 +1048,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter meta, req.snapshotPath(), req.incrementIndex(), - tmpWorkDir, - ioFactory, lowPtr, markWalFut )).chain(fut -> { @@ -1174,6 +1169,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter req.requestId(), parts, withMetaStorage, + req.dump(), locSndrFactory.apply(req.snapshotName(), req.snapshotPath())); if (withMetaStorage && task0 instanceof SnapshotFutureTask) { @@ -1191,7 +1187,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter .map(n -> cctx.discovery().node(n).consistentId().toString()) .collect(Collectors.toSet()); - File snpDir = snapshotLocalDir(req.snapshotName(), req.snapshotPath()); + File snpDir = snapshotLocalDir(req.snapshotName(), req.snapshotPath(), req.dump() ? locDumpDir : locSnpDir); snpDir.mkdirs(); @@ -1208,7 +1204,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter res.parts(), res.snapshotPointer(), cctx.gridConfig().getEncryptionSpi().masterKeyDigest(), - req.onlyPrimary() + req.onlyPrimary(), + req.dump() ); SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta, req.groups(), cctx.localNode(), snpDir, @@ -1216,7 +1213,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter req.meta(meta); - File smf = new File(snpDir, snapshotMetaFileName(cctx.localNode().consistentId().toString())); + String consId = cctx.localNode().consistentId().toString(); + + File smf = new File(snpDir, req.dump() ? dumpMetaFileName(consId) : snapshotMetaFileName(consId)); storeSnapshotMeta(req.meta(), smf); @@ -1230,80 +1229,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter }, snapshotExecutorService()); } - /** - * @param req Request. - * @param grps Cache groups to dump. - * @return Create dump future. - */ - private IgniteInternalFuture<SnapshotOperationResponse> initLocalDump(SnapshotOperationRequest req, List<Integer> grps) { - IgniteInternalFuture<?> task0; - - List<Integer> grpIds = grps.stream().filter(grpId -> cctx.cache().cacheGroup(grpId) != null).collect(Collectors.toList()); - - File dumpDir = snapshotLocalDir(req.snapshotName(), null, locDumpDir); - - if (grpIds.isEmpty()) - task0 = new GridFinishedFuture<>(Collections.emptySet()); - else { - dumpDir.mkdirs(); - - task0 = registerTask(req.snapshotName(), new DumpCacheFutureTask( - cctx, - req.snapshotName(), - req.operationalNodeId(), - req.requestId(), - req.snapshotPath(), - dumpDir, - tmpWorkDir, - ioFactory, - grpIds - )); - } - - return task0.chain(fut -> { - if (fut.error() != null) - throw F.wrap(fut.error()); - - Set<String> nodes = req.nodes().stream() - .map(n -> cctx.discovery().node(n).consistentId().toString()) - .collect(Collectors.toSet()); - - SnapshotFutureTaskResult res = (SnapshotFutureTaskResult)fut.result(); - - SnapshotMetadata meta = new SnapshotMetadata(req.requestId(), - req.snapshotName(), - cctx.localNode().consistentId().toString(), - pdsSettings.folderName(), - cctx.gridConfig().getDataStorageConfiguration().getPageSize(), - grpIds, - Collections.emptyList(), - nodes, - res.parts(), - null, - null, - false - ); - - SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta, req.groups(), cctx.localNode(), dumpDir, - req.streamerWarning(), true); - - req.meta(meta); - - File dmf = new File(dumpDir, dumpMetaFileName(cctx.localNode().consistentId().toString())); - - storeSnapshotMeta(meta, dmf); - - log.info("Dump metafile has been created: " + dmf.getAbsolutePath()); - - try { - return new SnapshotOperationResponse(handlers.invokeAll(SnapshotHandlerType.CREATE, ctx)); - } - catch (IgniteCheckedException e) { - throw F.wrap(e); - } - }, snapshotExecutorService()); - } - /** * @param id Request id. * @param res Results. @@ -2770,6 +2695,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter * @param requestId Snapshot operation request ID. * @param parts Collection of pairs group and appropriate cache partition to be snapshot. * @param withMetaStorage {@code true} if all metastorage data must be also included into snapshot. + * @param dump {@code true} if cache group dump must be created. * @param snpSndr Factory which produces snapshot receiver instance. * @return Snapshot operation task which should be registered on checkpoint to run. */ @@ -2779,10 +2705,14 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter UUID requestId, Map<Integer, Set<Integer>> parts, boolean withMetaStorage, + boolean dump, SnapshotSender snpSndr ) { - AbstractSnapshotFutureTask<?> task = registerTask(snpName, new SnapshotFutureTask(cctx, srcNodeId, requestId, - snpName, tmpWorkDir, ioFactory, snpSndr, parts, withMetaStorage, locBuff)); + AbstractSnapshotFutureTask<?> createTask = dump + ? new DumpCacheFutureTask(cctx, srcNodeId, requestId, snpName, snapshotLocalDir(snpName, null, locDumpDir), snpSndr, parts) + : new SnapshotFutureTask(cctx, srcNodeId, requestId, snpName, tmpWorkDir, ioFactory, snpSndr, parts, withMetaStorage, locBuff); + + AbstractSnapshotFutureTask<?> task = registerTask(snpName, createTask); if (!withMetaStorage) { for (Integer grpId : parts.keySet()) { @@ -2800,17 +2730,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter return task; } - /** - * @param snpName Unique snapshot name. - * @param srcNodeId Node id which cause snapshot operation. - * @param requestId Snapshot operation request ID. - * @param grpIds Groups. - * @return Snapshot operation task which should be registered on checkpoint to run. - */ - private IgniteInternalFuture<?> registerDumpTask(String snpName, UUID srcNodeId, UUID requestId, List<Integer> grpIds) { - return new GridFinishedFuture<>("Done!"); - } - /** * Registers a local snapshot task. * @@ -3055,7 +2974,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter * @return Relative configured path of persistence data storage directory for the local node. * Example: {@code snapshotWorkDir/db/IgniteNodeName0} */ - public static String databaseRelativePath(String folderName) { + static String databaseRelativePath(String folderName) { return Paths.get(DB_DEFAULT_FOLDER, folderName).toString(); } @@ -3948,8 +3867,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter reqMsg0.requestId(), snpName, reqMsg0.snapshotPath(), - tmpWorkDir, - ioFactory, rmtSndrFactory.apply(rqId, nodeId), reqMsg0.parts())); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java index e9be26974f7..03476df4d16 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java @@ -35,7 +35,6 @@ import org.apache.ignite.internal.binary.BinaryUtils; import org.apache.ignite.internal.pagemem.wal.record.IncrementalSnapshotFinishRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -75,8 +74,6 @@ class IncrementalSnapshotFutureTask extends AbstractSnapshotFutureTask<Void> imp SnapshotMetadata meta, @Nullable String snpPath, int incIdx, - File tmpWorkDir, - FileIOFactory ioFactory, WALPointer lowPtr, IgniteInternalFuture<WALPointer> highPtrFut ) { @@ -85,8 +82,6 @@ class IncrementalSnapshotFutureTask extends AbstractSnapshotFutureTask<Void> imp srcNodeId, reqNodeId, meta.snapshotName(), - tmpWorkDir, - ioFactory, new SnapshotSender( cctx.logger(IncrementalSnapshotFutureTask.class), cctx.kernalContext().pools().getSnapshotExecutorService() diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFinishedFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFinishedFutureTask.java index e32ddc1d00b..1ba5100f66a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFinishedFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFinishedFutureTask.java @@ -25,7 +25,7 @@ public class SnapshotFinishedFutureTask extends AbstractSnapshotFutureTask<Void> * @param e Finished snapshot task future with particular exception. */ public SnapshotFinishedFutureTask(IgniteCheckedException e) { - super(null, null, null, null, null, null, null, null); + super(null, null, null, null, null, null); onDone(e); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java index f1ea19e521d..76d3c63d37a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java @@ -23,11 +23,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -35,7 +34,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.atomic.AtomicLong; @@ -49,7 +47,6 @@ import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.binary.BinaryType; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.store.PageStore; @@ -57,8 +54,6 @@ import org.apache.ignite.internal.pagemem.store.PageWriteListener; import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; @@ -76,13 +71,11 @@ import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C3; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheWorkDir; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.copy; @@ -98,13 +91,19 @@ import static org.apache.ignite.internal.processors.cache.persistence.snapshot.I * If partitions for particular cache group are not provided that they will be collected and added * on checkpoint under the write-lock. */ -class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskResult> implements CheckpointListener { +class SnapshotFutureTask extends AbstractCreateBackupFutureTask implements CheckpointListener { /** File page store manager for accessing cache group associated files. */ private final FilePageStoreManager pageStore; /** Local buffer to perform copy-on-write operations for {@link PageStoreSerialWriter}. */ private final ThreadLocal<ByteBuffer> locBuff; + /** Snapshot working directory on file system. */ + protected final File tmpSnpWorkDir; + + /** IO factory which will be used for creating snapshot delta-writers. */ + protected final FileIOFactory ioFactory; + /** * The length of file size per each cache partition file. * Partition has value greater than zero only for partitions in OWNING state. @@ -129,9 +128,6 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskRe /** {@code true} if all metastorage data must be also included into snapshot. */ private final boolean withMetaStorage; - /** Cache group and corresponding partitions collected under the checkpoint write lock. */ - private final Map<Integer, Set<Integer>> processed = new HashMap<>(); - /** Checkpoint end future. */ private final CompletableFuture<Boolean> cpEndFut = new CompletableFuture<>(); @@ -141,9 +137,6 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskRe /** Absolute path to save intermediate results of cache partitions of this node. */ private volatile File tmpConsIdDir; - /** Future which will be completed when task requested to be closed. Will be executed on system pool. */ - private volatile CompletableFuture<Void> closeFut; - /** Pointer to {@link ClusterSnapshotRecord}. */ private volatile @Nullable WALPointer snpPtr; @@ -183,7 +176,7 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskRe boolean withMetaStorage, ThreadLocal<ByteBuffer> locBuff ) { - super(cctx, srcNodeId, reqId, snpName, tmpWorkDir, ioFactory, snpSndr, parts); + super(cctx, srcNodeId, reqId, snpName, snpSndr, parts); assert snpName != null : "Snapshot name cannot be empty or null."; assert snpSndr != null : "Snapshot sender which handles execution tasks must be not null."; @@ -191,6 +184,8 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskRe assert cctx.pageStore() instanceof FilePageStoreManager : "Snapshot task can work only with physical files."; assert !parts.containsKey(MetaStorage.METASTORAGE_CACHE_ID) : "The withMetaStorage must be used instead."; + this.tmpSnpWorkDir = new File(tmpWorkDir, snpName); + this.ioFactory = ioFactory; this.withMetaStorage = withMetaStorage; this.pageStore = (FilePageStoreManager)cctx.pageStore(); this.locBuff = locBuff; @@ -203,13 +198,9 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskRe if (th == null) return; - if (err.compareAndSet(null, th)) - closeAsync(); + super.acceptException(th); startedFut.onDone(th); - - if (!(th instanceof IgniteFutureCancelledCheckedException)) - U.error(log, "Snapshot task has accepted exception to stop", th); } /** {@inheritDoc} */ @@ -351,66 +342,7 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskRe ctx.walFlush(true); } - for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) { - int grpId = e.getKey(); - Set<Integer> grpParts = e.getValue(); - - CacheGroupContext gctx = cctx.cache().cacheGroup(grpId); - - Iterator<GridDhtLocalPartition> iter; - - if (grpParts == null) - iter = gctx.topology().currentLocalPartitions().iterator(); - else { - if (grpParts.contains(INDEX_PARTITION)) { - throw new IgniteCheckedException("Index partition cannot be included into snapshot if " + - " set of cache group partitions has been explicitly provided [grpId=" + grpId + ']'); - } - - iter = F.iterator(grpParts, gctx.topology()::localPartition, false); - } - - Set<Integer> owning = new HashSet<>(); - Set<Integer> missed = new HashSet<>(); - - // Iterate over partitions in particular cache group. - while (iter.hasNext()) { - GridDhtLocalPartition part = iter.next(); - - // Partition can be in MOVING\RENTING states. - // Index partition will be excluded if not all partition OWNING. - // There is no data assigned to partition, thus it haven't been created yet. - if (part.state() == GridDhtPartitionState.OWNING) - owning.add(part.id()); - else - missed.add(part.id()); - } - - boolean affNode = gctx.nodeFilter() == null || gctx.nodeFilter().apply(cctx.localNode()); - - if (grpParts != null) { - // Partition has been provided for cache group, but some of them are not in OWNING state. - // Exit with an error. - if (!missed.isEmpty()) { - throw new IgniteCheckedException("Snapshot operation cancelled due to " + - "not all of requested partitions has OWNING state on local node [grpId=" + grpId + - ", missed=" + S.toStringSortedDistinct(missed) + ']'); - } - } - else { - // Partitions have not been provided for snapshot task and all partitions have - // OWNING state, so index partition must be included into snapshot. - if (!missed.isEmpty()) { - log.warning("All local cache group partitions in OWNING state have been included into a snapshot. " + - "Partitions which have different states skipped. Index partitions has also been skipped " + - "[snpName=" + snpName + ", grpId=" + grpId + ", missed=" + S.toStringSortedDistinct(missed) + ']'); - } - else if (affNode && missed.isEmpty() && cctx.kernalContext().query().moduleEnabled()) - owning.add(INDEX_PARTITION); - } - - processed.put(grpId, owning); - } + processPartitions(); List<CacheConfiguration<?, ?>> ccfgs = new ArrayList<>(); @@ -457,118 +389,109 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskRe if (!startedFut.onDone()) return; - // Submit all tasks for partitions and deltas processing. - List<CompletableFuture<Void>> futs = new ArrayList<>(); - - if (log.isInfoEnabled()) { - log.info("Submit partition processing tasks to the snapshot execution pool " + - "[map=" + groupByGroupId(partFileLengths.keySet()) + - ", totalSize=" + U.humanReadableByteCount(partFileLengths.values().stream().mapToLong(v -> v).sum()) + ']'); - } + backupAllAsync(); + } + /** {@inheritDoc} */ + @Override protected List<CompletableFuture<Void>> saveMetaCopy() { Collection<BinaryType> binTypesCopy = cctx.kernalContext() .cacheObjects() .metadata(Collections.emptyList()) .values(); - // Process binary meta. - futs.add(CompletableFuture.runAsync( - wrapExceptionIfStarted(() -> snpSndr.sendBinaryMeta(binTypesCopy)), - snpSndr.executor())); - List<Map<Integer, MappedName>> mappingsCopy = cctx.kernalContext() .marshallerContext() .getCachedMappings(); - // Process marshaller meta. - futs.add(CompletableFuture.runAsync( - wrapExceptionIfStarted(() -> snpSndr.sendMarshallerMeta(mappingsCopy)), - snpSndr.executor())); - - // Send configuration files of all cache groups. - for (CacheConfigurationSender ccfgSndr : ccfgSndrs) - futs.add(CompletableFuture.runAsync(wrapExceptionIfStarted(ccfgSndr::sendCacheConfig), snpSndr.executor())); - - try { - for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) { - int grpId = e.getKey(); - String cacheDirName = pageStore.cacheDirName(grpId); + return Arrays.asList( + // Process binary meta. + CompletableFuture.runAsync(wrapExceptionIfStarted(() -> snpSndr.sendBinaryMeta(binTypesCopy)), snpSndr.executor()), + // Process marshaller meta. + CompletableFuture.runAsync(wrapExceptionIfStarted(() -> snpSndr.sendMarshallerMeta(mappingsCopy)), snpSndr.executor()) + ); + } - // Process partitions for a particular cache group. - for (int partId : e.getValue()) { - GroupPartitionId pair = new GroupPartitionId(grpId, partId); + /** {@inheritDoc} */ + @Override protected List<CompletableFuture<Void>> saveGroup(int grpId, Set<Integer> grpParts) throws IgniteCheckedException { + String cacheDirName = pageStore.cacheDirName(grpId); - Long partLen = partFileLengths.get(pair); + // Process partitions for a particular cache group. + return grpParts.stream().map(partId -> { + GroupPartitionId pair = new GroupPartitionId(grpId, partId); - totalSize.addAndGet(partLen); + Long partLen = partFileLengths.get(pair); - CompletableFuture<Void> fut0 = CompletableFuture.runAsync( - wrapExceptionIfStarted(() -> { - snpSndr.sendPart( - getPartitionFile(pageStore.workDir(), cacheDirName, partId), - cacheDirName, - pair, - partLen); + totalSize.addAndGet(partLen); - // Stop partition writer. - partDeltaWriters.get(pair).markPartitionProcessed(); + return CompletableFuture.runAsync( + wrapExceptionIfStarted(() -> { + snpSndr.sendPart( + getPartitionFile(pageStore.workDir(), cacheDirName, partId), + cacheDirName, + pair, + partLen); - processedSize.addAndGet(partLen); - }), - snpSndr.executor()) - // Wait for the completion of both futures - checkpoint end, copy partition. - .runAfterBothAsync(cpEndFut, - wrapExceptionIfStarted(() -> { - PageStoreSerialWriter writer = partDeltaWriters.get(pair); + // Stop partition writer. + partDeltaWriters.get(pair).markPartitionProcessed(); - writer.close(); + processedSize.addAndGet(partLen); + }), + snpSndr.executor()) + // Wait for the completion of both futures - checkpoint end, copy partition. + .runAfterBothAsync(cpEndFut, + wrapExceptionIfStarted(() -> { + PageStoreSerialWriter writer = partDeltaWriters.get(pair); - File delta = writer.deltaFile; + writer.close(); - try { - // Atomically creates a new, empty delta file if and only if - // a file with this name does not yet exist. - delta.createNewFile(); - } - catch (IOException ex) { - throw new IgniteCheckedException(ex); - } + File delta = writer.deltaFile; - snpSndr.sendDelta(delta, cacheDirName, pair); + try { + // Atomically creates a new, empty delta file if and only if + // a file with this name does not yet exist. + delta.createNewFile(); + } + catch (IOException ex) { + throw new IgniteCheckedException(ex); + } - processedSize.addAndGet(delta.length()); + snpSndr.sendDelta(delta, cacheDirName, pair); - boolean deleted = delta.delete(); + processedSize.addAndGet(delta.length()); - assert deleted; + boolean deleted = delta.delete(); - File deltaIdx = partDeltaIndexFile(delta); + assert deleted; - if (deltaIdx.exists()) { - deleted = deltaIdx.delete(); + File deltaIdx = partDeltaIndexFile(delta); - assert deleted; - } - }), - snpSndr.executor()); + if (deltaIdx.exists()) { + deleted = deltaIdx.delete(); - futs.add(fut0); - } - } + assert deleted; + } + }), + snpSndr.executor()); + }).collect(Collectors.toList()); + } - int futsSize = futs.size(); + /** {@inheritDoc} */ + @Override protected void backupAllAsync() { + if (log.isInfoEnabled()) { + log.info("Submit partition processing tasks to the snapshot execution pool " + + "[map=" + groupByGroupId(partFileLengths.keySet()) + + ", totalSize=" + U.humanReadableByteCount(partFileLengths.values().stream().mapToLong(v -> v).sum()) + ']'); + } - CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize])) - .whenComplete((res, t) -> { - assert t == null : "Exception must never be thrown since a wrapper is used " + - "for each snapshot task: " + t; + super.backupAllAsync(); + } - closeAsync(); - }); - } - catch (IgniteCheckedException e) { - acceptException(e); - } + /** {@inheritDoc} */ + @Override protected List<CompletableFuture<Void>> saveCacheConfigsCopy() { + // Send configuration files of all cache groups. + return ccfgSndrs.stream() + .map(ccfgSndr -> CompletableFuture.runAsync(wrapExceptionIfStarted(ccfgSndr::sendCacheConfig), snpSndr.executor())) + .collect(Collectors.toList()); } /** @@ -577,7 +500,7 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskRe * @param dirName Directory name to init. * @throws IgniteCheckedException If fails. */ - private void addPartitionWriters(int grpId, Set<Integer> parts, String dirName) throws IgniteCheckedException { + void addPartitionWriters(int grpId, Set<Integer> parts, String dirName) throws IgniteCheckedException { Integer encGrpId = cctx.cache().isEncrypted(grpId) ? grpId : null; for (int partId : parts) { @@ -592,10 +515,8 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskRe } } - /** - * @return Future which will be completed when operations truly stopped. - */ - public synchronized CompletableFuture<Void> closeAsync() { + /** {@inheritDoc} */ + @Override public synchronized CompletableFuture<Void> closeAsync() { if (closeFut == null) { Throwable err0 = err.get(); @@ -605,8 +526,10 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskRe .map(Map.Entry::getKey) .collect(Collectors.toSet()); - closeFut = CompletableFuture.runAsync(() -> onDone(new SnapshotFutureTaskResult(taken, snpPtr), err0), - cctx.kernalContext().pools().getSystemExecutorService()); + closeFut = CompletableFuture.runAsync( + () -> onDone(new SnapshotFutureTaskResult(taken, snpPtr), err0), + cctx.kernalContext().pools().getSystemExecutorService() + ); } return closeFut; @@ -622,22 +545,6 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskRe return processedSize.get(); } - /** {@inheritDoc} */ - @Override public boolean cancel() { - super.cancel(); - - try { - closeAsync().get(); - } - catch (InterruptedException | ExecutionException e) { - U.error(log, "SnapshotFutureTask cancellation failed", e); - - return false; - } - - return true; - } - /** * @param grps List of processing pairs. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java index 2f2a3abb9dd..bc3265f0f08 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java @@ -101,6 +101,9 @@ public class SnapshotMetadata implements Serializable { /** If {@code true} snapshot only primary copies of partitions. */ private boolean onlyPrimary; + /** If {@code true} cache gropu dump stored. */ + private boolean dump; + /** * @param rqId Unique request id. * @param snpName Snapshot name. @@ -112,6 +115,7 @@ public class SnapshotMetadata implements Serializable { * @param snpRecPtr WAL pointer to {@link ClusterSnapshotRecord} if exists. * @param masterKeyDigest Master key digest for encrypted caches. * @param onlyPrimary If {@code true} snapshot only primary copies of partitions. + * @param dump If {@code true} cache gropu dump stored. */ public SnapshotMetadata( UUID rqId, @@ -125,7 +129,8 @@ public class SnapshotMetadata implements Serializable { Set<GroupPartitionId> pairs, @Nullable WALPointer snpRecPtr, @Nullable byte[] masterKeyDigest, - boolean onlyPrimary + boolean onlyPrimary, + boolean dump ) { this.rqId = rqId; this.snpName = snpName; @@ -137,6 +142,7 @@ public class SnapshotMetadata implements Serializable { this.snpRecPtr = snpRecPtr; this.masterKeyDigest = masterKeyDigest; this.onlyPrimary = onlyPrimary; + this.dump = dump; if (!F.isEmpty(compGrpIds)) { hasComprGrps = true; @@ -228,6 +234,11 @@ public class SnapshotMetadata implements Serializable { return onlyPrimary; } + /** @return If {@code true} snapshot only primary copies of partitions. */ + public boolean dump() { + return dump; + } + /** Save the state of this <tt>HashMap</tt> partitions and cache groups to a stream. */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java index 90e562f4265..ddb8332e2f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java @@ -28,7 +28,6 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Function; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; import org.apache.ignite.internal.util.typedef.F; import org.jetbrains.annotations.Nullable; @@ -48,8 +47,6 @@ public class SnapshotResponseRemoteFutureTask extends AbstractSnapshotFutureTask * @param reqId Snapshot operation request ID. * @param snpName Unique identifier of snapshot process. * @param snpPath Snapshot directory path. - * @param tmpWorkDir Working directory for intermediate snapshot results. - * @param ioFactory Factory to working with snapshot files. * @param snpSndr Factory which produces snapshot receiver instance. * @param parts Partition to be processed. */ @@ -59,12 +56,10 @@ public class SnapshotResponseRemoteFutureTask extends AbstractSnapshotFutureTask UUID reqId, String snpName, String snpPath, - File tmpWorkDir, - FileIOFactory ioFactory, SnapshotSender snpSndr, Map<Integer, Set<Integer>> parts ) { - super(cctx, srcNodeId, reqId, snpName, tmpWorkDir, ioFactory, snpSndr, parts); + super(cctx, srcNodeId, reqId, snpName, snpSndr, parts); this.snpPath = snpPath; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotSender.java index 010ade1952f..c48f899fc7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotSender.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotSender.java @@ -33,7 +33,7 @@ import org.jetbrains.annotations.Nullable; /** * */ -public abstract class SnapshotSender { +abstract class SnapshotSender { /** Busy processing lock. */ private final ReadWriteLock lock = new ReentrantReadWriteLock(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpCacheFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpCacheFutureTask.java deleted file mode 100644 index 00c77c3ba38..00000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpCacheFutureTask.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.binary.BinaryType; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; -import org.apache.ignite.internal.MarshallerContextImpl; -import org.apache.ignite.internal.processors.cache.CacheGroupContext; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.StoredCacheData; -import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; -import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; -import org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractSnapshotFutureTask; -import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager; -import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotSender; -import org.apache.ignite.internal.processors.marshaller.MappedName; -import org.apache.ignite.internal.util.IgniteUtils; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.internal.processors.cache.GridLocalConfigManager.cachDataFilename; -import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX; -import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX; -import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DUMP_LOCK; - -/** */ -public class DumpCacheFutureTask extends AbstractSnapshotFutureTask<Void> { - /** */ - public static final CompletableFuture<?>[] COMPLETABLE_FUTURES = new CompletableFuture[0]; - - /** */ - private final File dumpDir; - - /** */ - private final List<Integer> grps; - - /** - * @param cctx Cache context. - * @param dumpName Dump name. - * @param srcNodeId Node id which cause snapshot task creation. - * @param reqId Snapshot operation request ID. - * @param tmpWorkDir Working directory for intermediate snapshot results. - * @param ioFactory Factory to working with snapshot files. - */ - public DumpCacheFutureTask( - GridCacheSharedContext<?, ?> cctx, - String dumpName, - UUID srcNodeId, - UUID reqId, - @Nullable String snpPath, - File dumpDir, - File tmpWorkDir, - FileIOFactory ioFactory, - List<Integer> grps - ) { - super( - cctx, - srcNodeId, - reqId, - dumpName, - tmpWorkDir, - ioFactory, - new SnapshotSender( - cctx.logger(DumpCacheFutureTask.class), - cctx.kernalContext().pools().getSnapshotExecutorService() - ) { - @Override protected void init(int partsCnt) { - // No-op. - } - - @Override protected void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) { - // No-op. - } - - @Override protected void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair) { - // No-op. - } - }, - null - ); - - this.dumpDir = dumpDir; - this.grps = grps; - } - - /** {@inheritDoc} */ - @Override public boolean start() { - try { - log.info("Start cache dump [name=" + snpName + ", grps=" + grps + ']'); - - File dumpNodeDir = IgniteSnapshotManager.nodeDumpDirectory(dumpDir, cctx); - - createDumpLock(dumpNodeDir); - - // Submit all tasks for groups processing. - List<CompletableFuture<Void>> futs = new ArrayList<>(); - - Collection<BinaryType> types = cctx.kernalContext().cacheObjects().binary().types(); - - futs.add(CompletableFuture.runAsync( - wrapExceptionIfStarted(() -> cctx.kernalContext().cacheObjects().saveMetadata(types, dumpDir)), - snpSndr.executor() - )); - - ArrayList<Map<Integer, MappedName>> mappings = cctx.kernalContext().marshallerContext().getCachedMappings(); - - futs.add(CompletableFuture.runAsync( - wrapExceptionIfStarted(() -> MarshallerContextImpl.saveMappings(cctx.kernalContext(), mappings, dumpDir)), - snpSndr.executor() - )); - - for (int grp : grps) { - futs.add(CompletableFuture.runAsync(wrapExceptionIfStarted(() -> { - long start = System.currentTimeMillis(); - - CacheGroupContext grpCtx = cctx.cache().cacheGroup(grp); - - log.info("Start group dump [name=" + grpCtx.cacheOrGroupName() + ", id=" + grp + ']'); - - File grpDir = new File( - dumpNodeDir, - (grpCtx.caches().size() > 1 ? CACHE_GRP_DIR_PREFIX : CACHE_DIR_PREFIX) + grpCtx.cacheOrGroupName() - ); - - IgniteUtils.ensureDirectory(grpDir, "dump group directory", null); - - for (GridCacheContext<?, ?> cacheCtx : grpCtx.caches()) { - CacheConfiguration<?, ?> ccfg = cacheCtx.config(); - - cctx.cache().configManager().writeCacheData( - new StoredCacheData(ccfg), - new File(grpDir, cachDataFilename(ccfg)) - ); - } - - try { - Thread.sleep(ThreadLocalRandom.current().nextInt(5_000)); - } - catch (InterruptedException e) { - acceptException(e); - } - - long time = System.currentTimeMillis() - start; - - log.info("Finish group dump [name=" + grpCtx.cacheOrGroupName() + ", id=" + grp + ", time=" + time + ']'); - }), snpSndr.executor())); - } - - CompletableFuture.allOf(futs.toArray(COMPLETABLE_FUTURES)).whenComplete((res, t) -> { - assert t == null : "Exception must never be thrown since a wrapper is used " + - "for each dum task: " + t; - - onDone(err.get()); // Will complete OK if err.get() == null. - }); - } - catch (IgniteCheckedException | IOException e) { - acceptException(e); - - onDone(e); - } - - return false; // Don't wait for checkpoint. - } - - /** */ - private void createDumpLock(File dumpNodeDir) throws IgniteCheckedException, IOException { - File lock = new File(dumpNodeDir, DUMP_LOCK); - - if (!lock.createNewFile()) - throw new IgniteCheckedException("Lock file can't be created or already exists: " + lock.getAbsolutePath()); - } - - /** {@inheritDoc} */ - @Override public void acceptException(Throwable th) { - if (th == null) - return; - - if (!(th instanceof IgniteFutureCancelledCheckedException)) - U.error(log, "Snapshot task has accepted exception to stop", th); - - err.compareAndSet(null, th); - } -} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java index 802497a2d72..7890402f2ec 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java @@ -824,7 +824,7 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest { SnapshotSender snpSndr ) throws IgniteCheckedException { AbstractSnapshotFutureTask<?> task = cctx.snapshotMgr().registerSnapshotTask(snpName, cctx.localNodeId(), null, - parts, withMetaStorage, snpSndr); + parts, withMetaStorage, false, snpSndr); if (!(task instanceof SnapshotFutureTask)) throw new IgniteCheckedException("Snapshot task hasn't been registered: " + task); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/EncryptedSnapshotTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/EncryptedSnapshotTest.java index d16d6074f52..d1d51093c21 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/EncryptedSnapshotTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/EncryptedSnapshotTest.java @@ -291,7 +291,7 @@ public class EncryptedSnapshotTest extends AbstractSnapshotSelfTest { GridTestUtils.assertThrowsAnyCause(log, () -> snp(ig).registerSnapshotTask(SNAPSHOT_NAME, ig.localNode().id(), - null, F.asMap(CU.cacheId(dfltCacheCfg.getName()), null), false, + null, F.asMap(CU.cacheId(dfltCacheCfg.getName()), null), false, false, snp(ig).localSnapshotSenderFactory().apply(SNAPSHOT_NAME, null)).get(TIMEOUT), IgniteCheckedException.class, "Metastore is required because it holds encryption keys"); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java index 79b3699b6b0..478cbb750e2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java @@ -148,6 +148,7 @@ public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest { null, F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null), encryption, + false, new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME, null)) { @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) { try {