This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch IGNITE-19950-snapshot-merge
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit dcb9e8142bf6c1fd02cfafee6942aea0b5dba556
Author: nizhikov <nizhi...@apache.org>
AuthorDate: Fri Jul 28 17:16:14 2023 +0300

    IGNITE-19950 Dump creation process implemented
---
 .../snapshot/AbstractSnapshotFutureTask.java       | 26 ++++++++
 .../snapshot/IgniteSnapshotManager.java            | 20 ++++--
 .../persistence/snapshot/SnapshotFutureTask.java   | 26 --------
 .../snapshot/dump/DumpCacheFutureTask.java         | 78 +++++++++++++++++-----
 4 files changed, 100 insertions(+), 50 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
index 5115077c026..894c2c8b70b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
@@ -27,6 +27,7 @@ import 
org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
@@ -128,6 +129,31 @@ public abstract class AbstractSnapshotFutureTask<T> 
extends GridFutureAdapter<T>
         return parts.keySet();
     }
 
+    /**
+     * @param exec Runnable task to execute.
+     * @return Wrapped task.
+     */
+    protected Runnable wrapExceptionIfStarted(IgniteThrowableRunner exec) {
+        return () -> {
+            if (stopping())
+                return;
+
+            try {
+                exec.run();
+            }
+            catch (Throwable t) {
+                acceptException(t);
+            }
+        };
+    }
+
+    /**
+     * @return {@code true} if current task requested to be stopped.
+     */
+    protected boolean stopping() {
+        return err.get() != null;
+    }
+
     /**
      * Initiates snapshot task.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 074833bdd63..a8b80df7da2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -701,6 +701,13 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
                 return views;
             })),
             Function.identity());
+
+        Arrays.stream(locDumpDir.listFiles())
+            .filter(File::isDirectory)
+            .filter(dumpDir -> new File(dumpDir, DUMP_LOCK).exists())
+            .forEach(lockedDumpDir -> log.warning("Found locked dump dir. " +
+                "This means, dump creation not finished prior to node fail. " +
+                "Please, remove it manually: " + lockedDumpDir));
     }
 
     /** {@inheritDoc} */
@@ -1262,7 +1269,7 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
                 .map(n -> cctx.discovery().node(n).consistentId().toString())
                 .collect(Collectors.toSet());
 
-            fut.result();
+            // Ignoring Void result fut.result().
 
             DumpMetadata meta = new DumpMetadata(
                 req.requestId(),
@@ -1272,13 +1279,12 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
                 nodes
             );
 
-            File smf = new File(dumpDir, 
snapshotMetaFileName(cctx.localNode().consistentId().toString()));
+            File dmf = new File(dumpDir, 
dumpMetaFileName(cctx.localNode().consistentId().toString()));
 
-            storeSnapshotMeta(meta, smf);
+            storeSnapshotMeta(meta, dmf);
 
-            log.info("Dump metafile has been created: " + 
smf.getAbsolutePath());
+            log.info("Dump metafile has been created: " + 
dmf.getAbsolutePath());
 
-            // TODO: Do we need to invoke handlers here?
             return new SnapshotOperationResponse(null);
         }, snapshotExecutorService());
     }
@@ -1458,6 +1464,8 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
 
                     if (req.incremental())
                         
U.delete(incrementalSnapshotLocalDir(req.snapshotName(), req.snapshotPath(), 
req.incrementIndex()));
+                    else if (req.dump())
+                        U.delete(snapshotLocalDir(req.snapshotName(), null, 
locDumpDir));
                     else {
                         deleteSnapshot(
                             snapshotLocalDir(req.snapshotName(), 
req.snapshotPath(), req.dump() ? locDumpDir : locSnpDir),
@@ -1581,7 +1589,7 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
                         clusterSnpFut.onDone();
 
                         if (log.isInfoEnabled())
-                            log.info(SNAPSHOT_FINISHED_MSG + snpReq);
+                            log.info(snpMsg(SNAPSHOT_FINISHED_MSG + snpReq, 
snpReq.dump()));
                     }
                 }
                 else if (snpReq.error() == null) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 7fc31dbdfb7..f1ea19e521d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -74,7 +74,6 @@ import 
org.apache.ignite.internal.processors.marshaller.MappedName;
 import 
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.C3;
 import org.apache.ignite.internal.util.typedef.F;
@@ -248,13 +247,6 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<SnapshotFutureTaskRe
         return startedFut;
     }
 
-    /**
-     * @return {@code true} if current task requested to be stopped.
-     */
-    private boolean stopping() {
-        return err.get() != null;
-    }
-
     /**
      * Initiates snapshot task.
      *
@@ -600,24 +592,6 @@ class SnapshotFutureTask extends 
AbstractSnapshotFutureTask<SnapshotFutureTaskRe
         }
     }
 
-    /**
-     * @param exec Runnable task to execute.
-     * @return Wrapped task.
-     */
-    private Runnable wrapExceptionIfStarted(IgniteThrowableRunner exec) {
-        return () -> {
-            if (stopping())
-                return;
-
-            try {
-                exec.run();
-            }
-            catch (Throwable t) {
-                acceptException(t);
-            }
-        };
-    }
-
     /**
      * @return Future which will be completed when operations truly stopped.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpCacheFutureTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpCacheFutureTask.java
index 30e56086f57..3b4b205c9e6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpCacheFutureTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpCacheFutureTask.java
@@ -19,11 +19,17 @@ package 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
-import java.util.function.BiConsumer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.MarshallerContextImpl;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -34,7 +40,9 @@ import 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPa
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractSnapshotFutureTask;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotSender;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.internal.processors.cache.GridLocalConfigManager.cachDataFilename;
@@ -43,7 +51,10 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.file.FileP
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DUMP_LOCK;
 
 /** */
-public class DumpCacheFutureTask extends AbstractSnapshotFutureTask<Void> 
implements BiConsumer<String, File> {
+public class DumpCacheFutureTask extends AbstractSnapshotFutureTask<Void> {
+    /** */
+    public static final CompletableFuture<?>[] COMPLETABLE_FUTURES = new 
CompletableFuture[0];
+
     /** */
     private final File dumpDir;
 
@@ -97,8 +108,6 @@ public class DumpCacheFutureTask extends 
AbstractSnapshotFutureTask<Void> implem
 
         this.dumpDir = dumpDir;
         this.grps = grps;
-
-        cctx.cache().configManager().addConfigurationChangeListener(this);
     }
 
     /** {@inheritDoc} */
@@ -110,7 +119,24 @@ public class DumpCacheFutureTask extends 
AbstractSnapshotFutureTask<Void> implem
 
             createDumpLock(dumpNodeDir);
 
-            for (Integer grp : grps) {
+            // Submit all tasks for groups processing.
+            List<CompletableFuture<Void>> futs = new ArrayList<>();
+
+            Collection<BinaryType> types = 
cctx.kernalContext().cacheObjects().binary().types();
+
+            futs.add(CompletableFuture.runAsync(
+                wrapExceptionIfStarted(() -> 
cctx.kernalContext().cacheObjects().saveMetadata(types, dumpDir)),
+                snpSndr.executor()
+            ));
+
+            ArrayList<Map<Integer, MappedName>> mappings = 
cctx.kernalContext().marshallerContext().getCachedMappings();
+
+            futs.add(CompletableFuture.runAsync(
+                wrapExceptionIfStarted(() -> 
MarshallerContextImpl.saveMappings(cctx.kernalContext(), mappings, dumpDir)),
+                snpSndr.executor()
+            ));
+
+            for (int grp : grps) {
                 CacheGroupContext grpCtx = cctx.cache().cacheGroup(grp);
 
                 File grpDir = new File(
@@ -128,20 +154,35 @@ public class DumpCacheFutureTask extends 
AbstractSnapshotFutureTask<Void> implem
                         new File(grpDir, cachDataFilename(ccfg))
                     );
                 }
-            }
 
-            cctx.kernalContext().cacheObjects().saveMetadata(
-                cctx.kernalContext().cacheObjects().binary().types(),
-                dumpDir
-            );
+                futs.add(CompletableFuture.runAsync(wrapExceptionIfStarted(() 
-> {
+                    long start = System.currentTimeMillis();
 
-            MarshallerContextImpl.saveMappings(cctx.kernalContext(), 
cctx.kernalContext()
-                .marshallerContext()
-                .getCachedMappings(), dumpDir);
+                    log.info("Start group dump [name=" + 
grpCtx.cacheOrGroupName() + ", id=" + grp + ']');
 
-            onDone();
+                    try {
+                        
Thread.sleep(ThreadLocalRandom.current().nextInt(5_000));
+                    }
+                    catch (InterruptedException e) {
+                        acceptException(e);
+                    }
+
+                    long time = System.currentTimeMillis() - start;
+
+                    log.info("Finish group dump [name=" + 
grpCtx.cacheOrGroupName() + ", id=" + grp + ", time=" + time + ']');
+                }), snpSndr.executor()));
+            }
+
+            
CompletableFuture.allOf(futs.toArray(COMPLETABLE_FUTURES)).whenComplete((res, 
t) -> {
+                assert t == null : "Exception must never be thrown since a 
wrapper is used " +
+                    "for each dum task: " + t;
+
+                onDone(err.get()); // Will complete OK if err.get() == null.
+            });
         }
         catch (IgniteCheckedException | IOException e) {
+            acceptException(e);
+
             onDone(e);
         }
 
@@ -158,11 +199,12 @@ public class DumpCacheFutureTask extends 
AbstractSnapshotFutureTask<Void> implem
 
     /** {@inheritDoc} */
     @Override public void acceptException(Throwable th) {
+        if (th == null)
+            return;
 
-    }
-
-    /** {@inheritDoc} */
-    @Override public void accept(String s, File file) {
+        if (!(th instanceof IgniteFutureCancelledCheckedException))
+            U.error(log, "Snapshot task has accepted exception to stop", th);
 
+        err.compareAndSet(null, th);
     }
 }

Reply via email to