xtern commented on code in PR #4240:
URL: https://github.com/apache/ignite-3/pull/4240#discussion_r1727526073


##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -401,74 +433,35 @@ private static List<String> missingNodes(Set<String> 
requiredNodes, Collection<L
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
-    CompletableFuture<Void> propagateTimeToReplicas(long timestamp, 
Collection<? extends ClusterNode> topologyNodes) {
-        HybridTimestamp nowTs = clockService.now();
-
-        return schemaSyncService.waitForMetadataCompleteness(nowTs)
-                .thenComposeAsync(ignore -> {
-                    Map<Integer, Integer> tablesWithPartitions =
-                            
catalogManagerFacade.collectTablesWithPartitionsBetween(timestamp, 
nowTs.longValue());
-
-                    Set<String> topologyNodeNames = topologyNodes.stream()
-                            .map(ClusterNode::name)
-                            .collect(Collectors.toSet());
-
-                    // TODO https://issues.apache.org/jira/browse/IGNITE-22951 
Minimize the number of network requests
-                    return 
CompletableFutures.allOf(tablesWithPartitions.entrySet().stream()
-                            .map(e -> invokeOnReplicas(e.getKey(), 
e.getValue(), timestamp, nowTs, topologyNodeNames))
-                            .collect(Collectors.toList())
-                    );
-                }, executor);
-    }
-
-    private CompletableFuture<Void> invokeOnReplicas(
-            int tableId,
-            int partitions,
-            long txBeginTime,
-            HybridTimestamp nowTs,
-            Set<String> logicalTopologyNodes
-    ) {
-        List<TablePartitionId> replicationGroupIds = new 
ArrayList<>(partitions);
-
-        for (int p = 0; p < partitions; p++) {
-            replicationGroupIds.add(new TablePartitionId(tableId, p));
+    private CompletableFuture<Void> invokeOnLocalReplicas(long txBeginTime, 
ObjectIterator<Entry> tabTtr) {
+        if (!tabTtr.hasNext()) {
+            return CompletableFutures.nullCompletedFuture();
         }
 
-        return placementDriver.getAssignments(replicationGroupIds, nowTs)
-                .thenComposeAsync(tokenizedAssignments -> {
-                    assert tokenizedAssignments.size() == 
replicationGroupIds.size();
-
-                    List<CompletableFuture<?>> replicaInvokeFutures = new 
ArrayList<>(partitions);
-
-                    for (int p = 0; p < partitions; p++) {
-                        TablePartitionId replicationGroupId = 
replicationGroupIds.get(p);
-                        TokenizedAssignments tokenizedAssignment = 
tokenizedAssignments.get(p);
+        Entry tableWithPartitions = tabTtr.next();
+        int tableId = tableWithPartitions.getIntKey();
+        int partitions = tableWithPartitions.getIntValue();
+        List<CompletableFuture<?>> partFutures = new ArrayList<>(partitions);
+        HybridTimestamp nowTs = clockService.now();
 
-                        if (tokenizedAssignment == null) {
-                            
throwAssignmentsNotReadyException(replicationGroupId);
+        for (int p = 0; p < partitions; p++) {
+            TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
p);
+
+            CompletableFuture<?> fut = placementDriver
+                    .getPrimaryReplica(tablePartitionId, nowTs)
+                    .thenCompose(meta -> {
+                        // If primary is not elected yet - we'll update 
replication groups on next iteration.
+                        if (meta == null || meta.getLeaseholder() == null) {
+                            return CompletableFutures.nullCompletedFuture();
                         }
 
-                        Set<String> assignments = 
tokenizedAssignment.nodes().stream()
-                                
.map(Assignment::consistentId).collect(Collectors.toSet());
-
-                        String targetNodeName;
-
-                        if (assignments.contains(localNodeName)) {
-                            targetNodeName = localNodeName;
-                        } else {
-                            targetNodeName = assignments.stream()
-                                    .filter(logicalTopologyNodes::contains)
-                                    .findAny()
-                                    .orElseThrow(() -> new 
IllegalStateException("Current topology doesn't include assignment nodes "
-                                            + "(assignments=" + 
tokenizedAssignment.nodes()
-                                            + ", topology=" + 
logicalTopologyNodes
-                                            + ", replication group=" + 
replicationGroupId + ").")
-                                    );
+                        if (!localNodeName.equals(meta.getLeaseholder())) {

Review Comment:
   Fixed, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to