Cyrill commented on code in PR #6288:
URL: https://github.com/apache/ignite-3/pull/6288#discussion_r2222481214


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -845,16 +856,110 @@ public CompletableFuture<Map<TablePartitionId, 
GlobalTablePartitionState>> globa
         }
     }
 
+    /**
+     * Converts {@link LocalPartitionStateMessageByNode} to a mapping of zone 
names to the set of zone partitions.
+     *
+     * @param partitionStateMap Partition state map.
+     * @return Mapping of zone names to the set of zone partitions.
+     */
+    private static Map<String, Set<ZonePartitionId>> toZonesOnNodes(
+            Map<ZonePartitionId, LocalPartitionStateMessageByNode> 
partitionStateMap
+    ) {
+        Map<String, Set<ZonePartitionId>> res = new HashMap<>();
+
+        for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode> 
entry : partitionStateMap.entrySet()) {
+            ZonePartitionId zonePartitionId = entry.getKey();
+
+            LocalPartitionStateMessageByNode 
zoneLocalPartitionStateMessageByNode = entry.getValue();
+
+            for (String nodeName : 
zoneLocalPartitionStateMessageByNode.nodes()) {
+                res.computeIfAbsent(nodeName, k -> new 
HashSet<>()).add(zonePartitionId);
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * Returns estimated number of rows for each table having a partition in 
the specified zones.
+     *
+     * <p>The result is returned from the nodes specified in the {@code 
zonesOnNodes.keySet()} -
+     * these are the nodes we previously received partition states from.
+     *
+     * @param zonesOnNodes Mapping of node names to the set of zone partitions.
+     * @param catalogVersion Catalog version.
+     * @return Future with the mapping.
+     */
+    private CompletableFuture<Map<String, Map<ZonePartitionId, 
Map<TablePartitionIdMessage, Long>>>> tableStateForZone(
+            Map<String, Set<ZonePartitionId>> zonesOnNodes,
+            int catalogVersion
+    ) {
+        Map<String, Map<ZonePartitionId, Map<TablePartitionIdMessage, Long>>> 
result = new ConcurrentHashMap<>();
+
+        CompletableFuture<?>[] futures = zonesOnNodes.entrySet().stream()
+                .map(entry ->
+                        tableStateForZoneOnNode(catalogVersion, 
entry.getKey(), entry.getValue())
+                                .thenAccept(response ->
+                                        response.states().forEach(state -> {
+                                            ZonePartitionId zonePartitionId = 
state.zonePartitionId().asZonePartitionId();
+
+                                            
result.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())

Review Comment:
   tableStateForZoneOnNode returns a result of messagingService.invoke, which 
can be executed in a different thread, so what we have here is accessing a 
collection from multiple threads



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to