Cyrill commented on code in PR #6288:
URL: https://github.com/apache/ignite-3/pull/6288#discussion_r2222481214
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -845,16 +856,110 @@ public CompletableFuture<Map<TablePartitionId,
GlobalTablePartitionState>> globa
}
}
+ /**
+ * Converts {@link LocalPartitionStateMessageByNode} to a mapping of zone
names to the set of zone partitions.
+ *
+ * @param partitionStateMap Partition state map.
+ * @return Mapping of zone names to the set of zone partitions.
+ */
+ private static Map<String, Set<ZonePartitionId>> toZonesOnNodes(
+ Map<ZonePartitionId, LocalPartitionStateMessageByNode>
partitionStateMap
+ ) {
+ Map<String, Set<ZonePartitionId>> res = new HashMap<>();
+
+ for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode>
entry : partitionStateMap.entrySet()) {
+ ZonePartitionId zonePartitionId = entry.getKey();
+
+ LocalPartitionStateMessageByNode
zoneLocalPartitionStateMessageByNode = entry.getValue();
+
+ for (String nodeName :
zoneLocalPartitionStateMessageByNode.nodes()) {
+ res.computeIfAbsent(nodeName, k -> new
HashSet<>()).add(zonePartitionId);
+ }
+ }
+
+ return res;
+ }
+
+ /**
+ * Returns estimated number of rows for each table having a partition in
the specified zones.
+ *
+ * <p>The result is returned from the nodes specified in the {@code
zonesOnNodes.keySet()} -
+ * these are the nodes we previously received partition states from.
+ *
+ * @param zonesOnNodes Mapping of node names to the set of zone partitions.
+ * @param catalogVersion Catalog version.
+ * @return Future with the mapping.
+ */
+ private CompletableFuture<Map<String, Map<ZonePartitionId,
Map<TablePartitionIdMessage, Long>>>> tableStateForZone(
+ Map<String, Set<ZonePartitionId>> zonesOnNodes,
+ int catalogVersion
+ ) {
+ Map<String, Map<ZonePartitionId, Map<TablePartitionIdMessage, Long>>>
result = new ConcurrentHashMap<>();
+
+ CompletableFuture<?>[] futures = zonesOnNodes.entrySet().stream()
+ .map(entry ->
+ tableStateForZoneOnNode(catalogVersion,
entry.getKey(), entry.getValue())
+ .thenAccept(response ->
+ response.states().forEach(state -> {
+ ZonePartitionId zonePartitionId =
state.zonePartitionId().asZonePartitionId();
+
+
result.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())
Review Comment:
tableStateForZoneOnNode returns a result of messagingService.invoke, which
can be executed in a different thread, so what we have here is accessing a
collection from multiple threads
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]