xtern commented on code in PR #10189:
URL: https://github.com/apache/ignite/pull/10189#discussion_r944280285


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java:
##########
@@ -72,70 +74,65 @@ public SnapshotResponseRemoteFutureTask(
             return false;
 
         try {
-            List<GroupPartitionId> handled = new ArrayList<>();
+            List<SnapshotMetadata> metas = 
cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
 
-            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
-                ofNullable(e.getValue()).orElse(Collections.emptySet())
-                    .forEach(p -> handled.add(new GroupPartitionId(e.getKey(), 
p)));
-            }
+            Function<GroupPartitionId, SnapshotMetadata> findMeta = pair -> {
+                for (SnapshotMetadata meta : metas) {
+                    Map<Integer, Set<Integer>> parts0 = meta.partitions();
 
-            snpSndr.init(handled.size());
+                    if (F.isEmpty(parts0))
+                        continue;
 
-            File snpDir = cctx.snapshotMgr().snapshotLocalDir(snpName, 
snpPath);
+                    Set<Integer> locParts = parts0.get(pair.getGroupId());
 
-            List<CompletableFuture<Void>> futs = new ArrayList<>();
-            List<SnapshotMetadata> metas = 
cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
+                    if (locParts != null && 
locParts.contains(pair.getPartitionId()))
+                        return meta;
+                }
 
-            for (SnapshotMetadata meta : metas) {
-                Map<Integer, Set<Integer>> parts0 = meta.partitions();
+                return null;
+            };
 
-                if (F.isEmpty(parts0))
-                    continue;
+            Map<GroupPartitionId, SnapshotMetadata> partsToSend = new 
HashMap<>();
 
-                handled.removeIf(gp -> {
-                    if (ofNullable(parts0.get(gp.getGroupId()))
-                        .orElse(Collections.emptySet())
-                        .contains(gp.getPartitionId())
-                    ) {
-                        futs.add(CompletableFuture.runAsync(() -> {
-                            if (err.get() != null)
-                                return;
+            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
+                e.getValue().forEach(p -> partsToSend.computeIfAbsent(new 
GroupPartitionId(e.getKey(), p), findMeta));
 
-                            File cacheDir = cacheDirectory(new File(snpDir, 
databaseRelativePath(meta.folderName())),
-                                gp.getGroupId());
+            if (partsToSend.containsValue(null)) {
+                Collection<GroupPartitionId> missed = 
F.viewReadOnly(partsToSend.entrySet(), Map.Entry::getKey,
+                    e -> e.getValue() == null);
 
-                            if (cacheDir == null) {
-                                throw new IgniteException("Cache directory not 
found [snpName=" + snpName + ", meta=" + meta +
-                                    ", pair=" + gp + ']');
-                            }
+                err.compareAndSet(null, new IgniteException("Snapshot 
partitions missed on local node " +
+                    "[snpName=" + snpName + ", missed=" + missed + ']'));

Review Comment:
   I assume that we should also stop executing this method at this point.(



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java:
##########
@@ -72,70 +74,65 @@ public SnapshotResponseRemoteFutureTask(
             return false;
 
         try {
-            List<GroupPartitionId> handled = new ArrayList<>();
+            List<SnapshotMetadata> metas = 
cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
 
-            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
-                ofNullable(e.getValue()).orElse(Collections.emptySet())
-                    .forEach(p -> handled.add(new GroupPartitionId(e.getKey(), 
p)));
-            }
+            Function<GroupPartitionId, SnapshotMetadata> findMeta = pair -> {
+                for (SnapshotMetadata meta : metas) {
+                    Map<Integer, Set<Integer>> parts0 = meta.partitions();
 
-            snpSndr.init(handled.size());
+                    if (F.isEmpty(parts0))
+                        continue;
 
-            File snpDir = cctx.snapshotMgr().snapshotLocalDir(snpName, 
snpPath);
+                    Set<Integer> locParts = parts0.get(pair.getGroupId());
 
-            List<CompletableFuture<Void>> futs = new ArrayList<>();
-            List<SnapshotMetadata> metas = 
cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
+                    if (locParts != null && 
locParts.contains(pair.getPartitionId()))
+                        return meta;
+                }
 
-            for (SnapshotMetadata meta : metas) {
-                Map<Integer, Set<Integer>> parts0 = meta.partitions();
+                return null;
+            };
 
-                if (F.isEmpty(parts0))
-                    continue;
+            Map<GroupPartitionId, SnapshotMetadata> partsToSend = new 
HashMap<>();
 
-                handled.removeIf(gp -> {
-                    if (ofNullable(parts0.get(gp.getGroupId()))
-                        .orElse(Collections.emptySet())
-                        .contains(gp.getPartitionId())
-                    ) {
-                        futs.add(CompletableFuture.runAsync(() -> {
-                            if (err.get() != null)
-                                return;
+            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
+                e.getValue().forEach(p -> partsToSend.computeIfAbsent(new 
GroupPartitionId(e.getKey(), p), findMeta));
 
-                            File cacheDir = cacheDirectory(new File(snpDir, 
databaseRelativePath(meta.folderName())),
-                                gp.getGroupId());
+            if (partsToSend.containsValue(null)) {
+                Collection<GroupPartitionId> missed = 
F.viewReadOnly(partsToSend.entrySet(), Map.Entry::getKey,
+                    e -> e.getValue() == null);
 
-                            if (cacheDir == null) {
-                                throw new IgniteException("Cache directory not 
found [snpName=" + snpName + ", meta=" + meta +
-                                    ", pair=" + gp + ']');
-                            }
+                err.compareAndSet(null, new IgniteException("Snapshot 
partitions missed on local node " +
+                    "[snpName=" + snpName + ", missed=" + missed + ']'));

Review Comment:
   I assume that we should also stop executing this method at this point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to