Cyrill commented on code in PR #6288: URL: https://github.com/apache/ignite-3/pull/6288#discussion_r2222481214
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -845,16 +856,110 @@ public CompletableFuture<Map<TablePartitionId, GlobalTablePartitionState>> globa } } + /** + * Converts {@link LocalPartitionStateMessageByNode} to a mapping of zone names to the set of zone partitions. + * + * @param partitionStateMap Partition state map. + * @return Mapping of zone names to the set of zone partitions. + */ + private static Map<String, Set<ZonePartitionId>> toZonesOnNodes( + Map<ZonePartitionId, LocalPartitionStateMessageByNode> partitionStateMap + ) { + Map<String, Set<ZonePartitionId>> res = new HashMap<>(); + + for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode> entry : partitionStateMap.entrySet()) { + ZonePartitionId zonePartitionId = entry.getKey(); + + LocalPartitionStateMessageByNode zoneLocalPartitionStateMessageByNode = entry.getValue(); + + for (String nodeName : zoneLocalPartitionStateMessageByNode.nodes()) { + res.computeIfAbsent(nodeName, k -> new HashSet<>()).add(zonePartitionId); + } + } + + return res; + } + + /** + * Returns estimated number of rows for each table having a partition in the specified zones. + * + * <p>The result is returned from the nodes specified in the {@code zonesOnNodes.keySet()} - + * these are the nodes we previously received partition states from. + * + * @param zonesOnNodes Mapping of node names to the set of zone partitions. + * @param catalogVersion Catalog version. + * @return Future with the mapping. + */ + private CompletableFuture<Map<String, Map<ZonePartitionId, Map<TablePartitionIdMessage, Long>>>> tableStateForZone( + Map<String, Set<ZonePartitionId>> zonesOnNodes, + int catalogVersion + ) { + Map<String, Map<ZonePartitionId, Map<TablePartitionIdMessage, Long>>> result = new ConcurrentHashMap<>(); + + CompletableFuture<?>[] futures = zonesOnNodes.entrySet().stream() + .map(entry -> + tableStateForZoneOnNode(catalogVersion, entry.getKey(), entry.getValue()) + .thenAccept(response -> + response.states().forEach(state -> { + ZonePartitionId zonePartitionId = state.zonePartitionId().asZonePartitionId(); + + result.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>()) Review Comment: tableStateForZoneOnNode returns a result of messagingService.invoke, which can be executed in a different thread, so what we have here is accessing a collection from multiple threads -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org