This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-22722
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit abc8c453f449f9695351b819c1259367bab4f277
Author: amashenkov <[email protected]>
AuthorDate: Thu Aug 8 12:56:23 2024 +0300

    Add bulk method for getting assignments.
---
 .../compaction/CatalogCompactionRunner.java        | 36 +++++++++-------------
 .../ignite/client/handler/FakePlacementDriver.java |  4 +--
 .../ignite/internal/index/TestPlacementDriver.java |  5 +--
 .../replicator/utils/TestPlacementDriver.java      |  5 +--
 .../AssignmentsPlacementDriver.java                | 22 ++++++++++++-
 .../placementdriver/TestPlacementDriver.java       |  5 +--
 .../placementdriver/AssignmentsTracker.java        | 13 ++++++--
 .../placementdriver/PlacementDriverManager.java    |  7 +++--
 .../wrappers/DelegatingPlacementDriver.java        |  7 +++--
 9 files changed, 64 insertions(+), 40 deletions(-)

diff --git 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
index 75e4bbb975..255ee1f301 100644
--- 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
+++ 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
-import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.affinity.TokenizedAssignments;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogManagerImpl;
@@ -337,33 +336,26 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
 
         assert zone != null : table.zoneId();
 
-        List<CompletableFuture<?>> partitionFutures = new 
ArrayList<>(zone.partitions());
+        int partitions = zone.partitions();
 
-        for (int p = 0; p < zone.partitions(); p++) {
-            ReplicationGroupId replicationGroupId = new 
TablePartitionId(table.id(), p);
+        List<ReplicationGroupId> replicationGroupIds = new 
ArrayList<>(partitions);
 
-            CompletableFuture<TokenizedAssignments> assignmentsFut = 
placementDriver.getAssignments(replicationGroupId, nowTs)
-                    .whenComplete((tokenizedAssignments, ex) -> {
-                        if (ex != null) {
-                            return;
-                        }
+        for (int p = 0; p < partitions; p++) {
+            replicationGroupIds.add(new TablePartitionId(table.id(), p));
+        }
 
-                        if (tokenizedAssignments == null) {
+        return placementDriver.getAssignments(replicationGroupIds, nowTs)
+                .thenAccept(tokenizedAssignments -> {
+                    for (int p = 0; p < partitions; p++) {
+                        TokenizedAssignments assignment = 
tokenizedAssignments.get(p);
+                        if (assignment == null) {
                             throw new IllegalStateException("Cannot get 
assignments for table " + table.name()
-                                    + " (replication group=" + 
replicationGroupId + ").");
+                                    + " (replication group=" + 
replicationGroupIds.get(p) + ").");
                         }
 
-                        List<String> assignments = 
tokenizedAssignments.nodes().stream()
-                                .map(Assignment::consistentId)
-                                .collect(Collectors.toList());
-
-                        required.addAll(assignments);
-                    });
-
-            partitionFutures.add(assignmentsFut);
-        }
-
-        return CompletableFutures.allOf(partitionFutures)
+                        assignment.nodes().forEach(a -> 
required.add(a.consistentId()));
+                    }
+                })
                 .thenCompose(ignore -> collectRequiredNodes(catalog, tabItr, 
required, nowTs));
     }
 
diff --git 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index 54da3e4648..8f1bdc5835 100644
--- 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++ 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -106,8 +106,8 @@ public class FakePlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
     }
 
     @Override
-    public CompletableFuture<TokenizedAssignments> getAssignments(
-            ReplicationGroupId replicationGroupId,
+    public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
             HybridTimestamp clusterTimeToAwait
     ) {
         return failedFuture(new 
UnsupportedOperationException("getAssignments() is not supported in 
FakePlacementDriver yet."));
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
index fb81f9cee2..f0650073ca 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.index;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -58,8 +59,8 @@ class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
     }
 
     @Override
-    public CompletableFuture<TokenizedAssignments> getAssignments(
-            ReplicationGroupId replicationGroupId,
+    public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
             HybridTimestamp clusterTimeToAwait
     ) {
         return failedFuture(new 
UnsupportedOperationException("getAssignments() is not supported in 
FakePlacementDriver yet."));
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
index 51b207f66b..aa2c0bcc15 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.partition.replicator.utils;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.affinity.TokenizedAssignments;
@@ -67,8 +68,8 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
     }
 
     @Override
-    public CompletableFuture<TokenizedAssignments> getAssignments(
-            ReplicationGroupId replicationGroupId,
+    public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
             HybridTimestamp clusterTimeToAwait
     ) {
         return failedFuture(new 
UnsupportedOperationException("getAssignments() is not supported in 
FakePlacementDriver yet."));
diff --git 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
index 641ab117ec..164ee37d59 100644
--- 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
+++ 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.placementdriver;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.affinity.TokenizedAssignments;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -39,5 +40,24 @@ public interface AssignmentsPlacementDriver {
      * @param clusterTimeToAwait Cluster time to await.
      * @return Tokenized assignments.
      */
-    CompletableFuture<TokenizedAssignments> getAssignments(ReplicationGroupId 
replicationGroupId, HybridTimestamp clusterTimeToAwait);
+    default CompletableFuture<TokenizedAssignments> getAssignments(
+            ReplicationGroupId replicationGroupId,
+            HybridTimestamp clusterTimeToAwait
+    ) {
+        return getAssignments(List.of(replicationGroupId), 
clusterTimeToAwait).thenApply(assignments -> assignments.get(0));
+    }
+
+    /**
+     * Returns the future with list of tokenized assignments, which contains 
newest available tokenized assignment for the specified
+     * replication group id or {@code null} if assignment for the replication 
group id was not found. The future will be completed after
+     * clusterTime (meta storage safe time) will become greater or equal to 
the clusterTimeToAwait parameter.
+     *
+     * @param replicationGroupIds List of replication group Ids.
+     * @param clusterTimeToAwait Cluster time to await.
+     * @return List of tokenized assignments.
+     */
+    CompletableFuture<List<TokenizedAssignments>> getAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
+            HybridTimestamp clusterTimeToAwait
+    );
 }
diff --git 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index 3392cef934..d91e9499ff 100644
--- 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++ 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -21,6 +21,7 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -85,8 +86,8 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
     }
 
     @Override
-    public CompletableFuture<TokenizedAssignments> getAssignments(
-            ReplicationGroupId replicationGroupId,
+    public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
             HybridTimestamp clusterTimeToAwait
     ) {
         return failedFuture(new 
UnsupportedOperationException("getAssignments() is not supported in 
FakePlacementDriver yet."));
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index b71a9f7f4c..3f49ca826c 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.StringUtils.incrementLastChar;
 
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -129,14 +130,20 @@ public class AssignmentsTracker implements 
AssignmentsPlacementDriver {
     }
 
     @Override
-    public CompletableFuture<TokenizedAssignments> getAssignments(
-            ReplicationGroupId replicationGroupId,
+    public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
             HybridTimestamp clusterTimeToAwait
     ) {
         return msManager
                 .clusterTime()
                 .waitFor(clusterTimeToAwait)
-                .thenApply(ignored -> inBusyLock(busyLock, () -> 
assignments().get(replicationGroupId)));
+                .thenApply(ignored -> inBusyLock(busyLock, () -> {
+                    Map<ReplicationGroupId, TokenizedAssignments> assignments 
= assignments();
+
+                    return replicationGroupIds.stream()
+                            .map(assignments::get)
+                            .collect(Collectors.toList());
+                }));
     }
 
     /**
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index a798c7a72a..e7bae39bd6 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -21,6 +21,7 @@ import static 
java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -272,11 +273,11 @@ public class PlacementDriverManager implements 
IgniteComponent {
     private PlacementDriver createPlacementDriver() {
         return new PlacementDriver() {
             @Override
-            public CompletableFuture<TokenizedAssignments> getAssignments(
-                    ReplicationGroupId replicationGroupId,
+            public CompletableFuture<List<TokenizedAssignments>> 
getAssignments(
+                    List<? extends ReplicationGroupId> replicationGroupIds,
                     HybridTimestamp timestamp
             ) {
-                return assignmentsTracker.getAssignments(replicationGroupId, 
timestamp);
+                return assignmentsTracker.getAssignments(replicationGroupIds, 
timestamp);
             }
 
             @Override
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
index eca347e990..c5989ec45a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.wrappers;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.affinity.TokenizedAssignments;
@@ -65,10 +66,10 @@ abstract class DelegatingPlacementDriver implements 
PlacementDriver {
     }
 
     @Override
-    public CompletableFuture<TokenizedAssignments> getAssignments(
-            ReplicationGroupId replicationGroupId,
+    public CompletableFuture<List<TokenizedAssignments>> getAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
             HybridTimestamp clusterTimeToAwait
     ) {
-        return delegate.getAssignments(replicationGroupId, clusterTimeToAwait);
+        return delegate.getAssignments(replicationGroupIds, 
clusterTimeToAwait);
     }
 }

Reply via email to