This is an automated email from the ASF dual-hosted git repository. nizhikov pushed a commit to branch IGNITE-19950-snapshot-merge in repository https://gitbox.apache.org/repos/asf/ignite.git
commit dcb9e8142bf6c1fd02cfafee6942aea0b5dba556 Author: nizhikov <nizhi...@apache.org> AuthorDate: Fri Jul 28 17:16:14 2023 +0300 IGNITE-19950 Dump creation process implemented --- .../snapshot/AbstractSnapshotFutureTask.java | 26 ++++++++ .../snapshot/IgniteSnapshotManager.java | 20 ++++-- .../persistence/snapshot/SnapshotFutureTask.java | 26 -------- .../snapshot/dump/DumpCacheFutureTask.java | 78 +++++++++++++++++----- 4 files changed, 100 insertions(+), 50 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java index 5115077c026..894c2c8b70b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.IgniteThrowableRunner; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -128,6 +129,31 @@ public abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> return parts.keySet(); } + /** + * @param exec Runnable task to execute. + * @return Wrapped task. + */ + protected Runnable wrapExceptionIfStarted(IgniteThrowableRunner exec) { + return () -> { + if (stopping()) + return; + + try { + exec.run(); + } + catch (Throwable t) { + acceptException(t); + } + }; + } + + /** + * @return {@code true} if current task requested to be stopped. + */ + protected boolean stopping() { + return err.get() != null; + } + /** * Initiates snapshot task. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 074833bdd63..a8b80df7da2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -701,6 +701,13 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter return views; })), Function.identity()); + + Arrays.stream(locDumpDir.listFiles()) + .filter(File::isDirectory) + .filter(dumpDir -> new File(dumpDir, DUMP_LOCK).exists()) + .forEach(lockedDumpDir -> log.warning("Found locked dump dir. " + + "This means, dump creation not finished prior to node fail. " + + "Please, remove it manually: " + lockedDumpDir)); } /** {@inheritDoc} */ @@ -1262,7 +1269,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter .map(n -> cctx.discovery().node(n).consistentId().toString()) .collect(Collectors.toSet()); - fut.result(); + // Ignoring Void result fut.result(). DumpMetadata meta = new DumpMetadata( req.requestId(), @@ -1272,13 +1279,12 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter nodes ); - File smf = new File(dumpDir, snapshotMetaFileName(cctx.localNode().consistentId().toString())); + File dmf = new File(dumpDir, dumpMetaFileName(cctx.localNode().consistentId().toString())); - storeSnapshotMeta(meta, smf); + storeSnapshotMeta(meta, dmf); - log.info("Dump metafile has been created: " + smf.getAbsolutePath()); + log.info("Dump metafile has been created: " + dmf.getAbsolutePath()); - // TODO: Do we need to invoke handlers here? return new SnapshotOperationResponse(null); }, snapshotExecutorService()); } @@ -1458,6 +1464,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter if (req.incremental()) U.delete(incrementalSnapshotLocalDir(req.snapshotName(), req.snapshotPath(), req.incrementIndex())); + else if (req.dump()) + U.delete(snapshotLocalDir(req.snapshotName(), null, locDumpDir)); else { deleteSnapshot( snapshotLocalDir(req.snapshotName(), req.snapshotPath(), req.dump() ? locDumpDir : locSnpDir), @@ -1581,7 +1589,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter clusterSnpFut.onDone(); if (log.isInfoEnabled()) - log.info(SNAPSHOT_FINISHED_MSG + snpReq); + log.info(snpMsg(SNAPSHOT_FINISHED_MSG + snpReq, snpReq.dump())); } } else if (snpReq.error() == null) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java index 7fc31dbdfb7..f1ea19e521d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java @@ -74,7 +74,6 @@ import org.apache.ignite.internal.processors.marshaller.MappedName; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.IgniteThrowableRunner; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C3; import org.apache.ignite.internal.util.typedef.F; @@ -248,13 +247,6 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskRe return startedFut; } - /** - * @return {@code true} if current task requested to be stopped. - */ - private boolean stopping() { - return err.get() != null; - } - /** * Initiates snapshot task. * @@ -600,24 +592,6 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskRe } } - /** - * @param exec Runnable task to execute. - * @return Wrapped task. - */ - private Runnable wrapExceptionIfStarted(IgniteThrowableRunner exec) { - return () -> { - if (stopping()) - return; - - try { - exec.run(); - } - catch (Throwable t) { - acceptException(t); - } - }; - } - /** * @return Future which will be completed when operations truly stopped. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpCacheFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpCacheFutureTask.java index 30e56086f57..3b4b205c9e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpCacheFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpCacheFutureTask.java @@ -19,11 +19,17 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.UUID; -import java.util.function.BiConsumer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.binary.BinaryType; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.MarshallerContextImpl; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -34,7 +40,9 @@ import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPa import org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractSnapshotFutureTask; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotSender; +import org.apache.ignite.internal.processors.marshaller.MappedName; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.GridLocalConfigManager.cachDataFilename; @@ -43,7 +51,10 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DUMP_LOCK; /** */ -public class DumpCacheFutureTask extends AbstractSnapshotFutureTask<Void> implements BiConsumer<String, File> { +public class DumpCacheFutureTask extends AbstractSnapshotFutureTask<Void> { + /** */ + public static final CompletableFuture<?>[] COMPLETABLE_FUTURES = new CompletableFuture[0]; + /** */ private final File dumpDir; @@ -97,8 +108,6 @@ public class DumpCacheFutureTask extends AbstractSnapshotFutureTask<Void> implem this.dumpDir = dumpDir; this.grps = grps; - - cctx.cache().configManager().addConfigurationChangeListener(this); } /** {@inheritDoc} */ @@ -110,7 +119,24 @@ public class DumpCacheFutureTask extends AbstractSnapshotFutureTask<Void> implem createDumpLock(dumpNodeDir); - for (Integer grp : grps) { + // Submit all tasks for groups processing. + List<CompletableFuture<Void>> futs = new ArrayList<>(); + + Collection<BinaryType> types = cctx.kernalContext().cacheObjects().binary().types(); + + futs.add(CompletableFuture.runAsync( + wrapExceptionIfStarted(() -> cctx.kernalContext().cacheObjects().saveMetadata(types, dumpDir)), + snpSndr.executor() + )); + + ArrayList<Map<Integer, MappedName>> mappings = cctx.kernalContext().marshallerContext().getCachedMappings(); + + futs.add(CompletableFuture.runAsync( + wrapExceptionIfStarted(() -> MarshallerContextImpl.saveMappings(cctx.kernalContext(), mappings, dumpDir)), + snpSndr.executor() + )); + + for (int grp : grps) { CacheGroupContext grpCtx = cctx.cache().cacheGroup(grp); File grpDir = new File( @@ -128,20 +154,35 @@ public class DumpCacheFutureTask extends AbstractSnapshotFutureTask<Void> implem new File(grpDir, cachDataFilename(ccfg)) ); } - } - cctx.kernalContext().cacheObjects().saveMetadata( - cctx.kernalContext().cacheObjects().binary().types(), - dumpDir - ); + futs.add(CompletableFuture.runAsync(wrapExceptionIfStarted(() -> { + long start = System.currentTimeMillis(); - MarshallerContextImpl.saveMappings(cctx.kernalContext(), cctx.kernalContext() - .marshallerContext() - .getCachedMappings(), dumpDir); + log.info("Start group dump [name=" + grpCtx.cacheOrGroupName() + ", id=" + grp + ']'); - onDone(); + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(5_000)); + } + catch (InterruptedException e) { + acceptException(e); + } + + long time = System.currentTimeMillis() - start; + + log.info("Finish group dump [name=" + grpCtx.cacheOrGroupName() + ", id=" + grp + ", time=" + time + ']'); + }), snpSndr.executor())); + } + + CompletableFuture.allOf(futs.toArray(COMPLETABLE_FUTURES)).whenComplete((res, t) -> { + assert t == null : "Exception must never be thrown since a wrapper is used " + + "for each dum task: " + t; + + onDone(err.get()); // Will complete OK if err.get() == null. + }); } catch (IgniteCheckedException | IOException e) { + acceptException(e); + onDone(e); } @@ -158,11 +199,12 @@ public class DumpCacheFutureTask extends AbstractSnapshotFutureTask<Void> implem /** {@inheritDoc} */ @Override public void acceptException(Throwable th) { + if (th == null) + return; - } - - /** {@inheritDoc} */ - @Override public void accept(String s, File file) { + if (!(th instanceof IgniteFutureCancelledCheckedException)) + U.error(log, "Snapshot task has accepted exception to stop", th); + err.compareAndSet(null, th); } }