xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621296629



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
##########
@@ -525,76 +537,105 @@ else if (affNode && missed.isEmpty() && 
cctx.kernalContext().query().moduleEnabl
         for (CacheConfigurationSender ccfgSndr : ccfgSndrs)
             
futs.add(CompletableFuture.runAsync(wrapExceptionIfStarted(ccfgSndr::sendCacheConfig),
 snpSndr.executor()));
 
-        for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
-            int grpId = e.getKey();
+        try {
+            for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+                int grpId = e.getKey();
+                String cacheDirName = cacheDirName(grpId);
 
-            CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
+                // Process partitions for a particular cache group.
+                for (int partId : e.getValue()) {
+                    GroupPartitionId pair = new GroupPartitionId(grpId, 
partId);
 
-            if (gctx == null) {
-                acceptException(new IgniteCheckedException("Cache group 
context has not found " +
-                    "due to the cache group is stopped: " + grpId));
+                    Long partLen = partFileLengths.get(pair);
 
-                break;
+                    CompletableFuture<Void> fut0 = CompletableFuture.runAsync(
+                        wrapExceptionIfStarted(() -> {
+                            snpSndr.sendPart(
+                                getPartitionFile(pageStore.workDir(), 
cacheDirName, partId),
+                                cacheDirName,
+                                pair,
+                                partLen);
+
+                            // Stop partition writer.
+                            
partDeltaWriters.get(pair).markPartitionProcessed();
+                        }),
+                        snpSndr.executor())
+                        // Wait for the completion of both futures - 
checkpoint end, copy partition.
+                        .runAfterBothAsync(cpEndFut,
+                            wrapExceptionIfStarted(() -> {
+                                File delta = 
partDeltaWriters.get(pair).deltaFile;
+
+                                try {
+                                    // Atomically creates a new, empty delta 
file if and only if
+                                    // a file with this name does not yet 
exist.
+                                    delta.createNewFile();
+                                }
+                                catch (IOException ex) {
+                                    throw new IgniteCheckedException(ex);
+                                }
+
+                                snpSndr.sendDelta(delta, cacheDirName, pair);
+
+                                boolean deleted = delta.delete();
+
+                                assert deleted;
+                            }),
+                            snpSndr.executor());
+
+                    futs.add(fut0);
+                }
             }
 
-            // Process partitions for a particular cache group.
-            for (int partId : e.getValue()) {
-                GroupPartitionId pair = new GroupPartitionId(grpId, partId);
-
-                CacheConfiguration<?, ?> ccfg = gctx.config();
+            int futsSize = futs.size();
 
-                assert ccfg != null : "Cache configuration cannot be empty on 
snapshot creation: " + pair;
+            CompletableFuture.allOf(futs.toArray(new 
CompletableFuture[futsSize]))
+                .whenComplete((res, t) -> {
+                    assert t == null : "Exception must never be thrown since a 
wrapper is used " +
+                        "for each snapshot task: " + t;
 
-                String cacheDirName = cacheDirName(ccfg);
-                Long partLen = partFileLengths.get(pair);
-
-                CompletableFuture<Void> fut0 = CompletableFuture.runAsync(
-                    wrapExceptionIfStarted(() -> {
-                        snpSndr.sendPart(
-                            getPartitionFile(pageStore.workDir(), 
cacheDirName, partId),
-                            cacheDirName,
-                            pair,
-                            partLen);
-
-                        // Stop partition writer.
-                        partDeltaWriters.get(pair).markPartitionProcessed();
-                    }),
-                    snpSndr.executor())
-                    // Wait for the completion of both futures - checkpoint 
end, copy partition.
-                    .runAfterBothAsync(cpEndFut,
-                        wrapExceptionIfStarted(() -> {
-                            File delta = partDeltaWriters.get(pair).deltaFile;
-
-                            try {
-                                // Atomically creates a new, empty delta file 
if and only if
-                                // a file with this name does not yet exist.
-                                delta.createNewFile();
-                            }
-                            catch (IOException ex) {
-                                throw new IgniteCheckedException(ex);
-                            }
+                    closeAsync();
+                });
+        }
+        catch (IgniteCheckedException e) {
+            acceptException(e);
+        }
+    }
 
-                            snpSndr.sendDelta(delta, cacheDirName, pair);
+    /**
+     * @param grpId Cache group id.
+     * @param parts Set of partitions to be processed.
+     * @param dirSupp Directory to init.
+     * @throws IgniteCheckedException If fails.
+     */
+    private void addPartitionWriters(int grpId, Set<Integer> parts, 
IgniteThrowableSupplier<String> dirSupp) throws IgniteCheckedException {

Review comment:
       why you need dirSupp supplier? Why don't pass dirName directly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to