This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-22722 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit abc8c453f449f9695351b819c1259367bab4f277 Author: amashenkov <[email protected]> AuthorDate: Thu Aug 8 12:56:23 2024 +0300 Add bulk method for getting assignments. --- .../compaction/CatalogCompactionRunner.java | 36 +++++++++------------- .../ignite/client/handler/FakePlacementDriver.java | 4 +-- .../ignite/internal/index/TestPlacementDriver.java | 5 +-- .../replicator/utils/TestPlacementDriver.java | 5 +-- .../AssignmentsPlacementDriver.java | 22 ++++++++++++- .../placementdriver/TestPlacementDriver.java | 5 +-- .../placementdriver/AssignmentsTracker.java | 13 ++++++-- .../placementdriver/PlacementDriverManager.java | 7 +++-- .../wrappers/DelegatingPlacementDriver.java | 7 +++-- 9 files changed, 64 insertions(+), 40 deletions(-) diff --git a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java index 75e4bbb975..255ee1f301 100644 --- a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java +++ b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java @@ -31,7 +31,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import org.apache.ignite.internal.affinity.Assignment; import org.apache.ignite.internal.affinity.TokenizedAssignments; import org.apache.ignite.internal.catalog.Catalog; import org.apache.ignite.internal.catalog.CatalogManagerImpl; @@ -337,33 +336,26 @@ public class CatalogCompactionRunner implements IgniteComponent { assert zone != null : table.zoneId(); - List<CompletableFuture<?>> partitionFutures = new ArrayList<>(zone.partitions()); + int partitions = zone.partitions(); - for (int p = 0; p < zone.partitions(); p++) { - ReplicationGroupId replicationGroupId = new TablePartitionId(table.id(), p); + List<ReplicationGroupId> replicationGroupIds = new ArrayList<>(partitions); - CompletableFuture<TokenizedAssignments> assignmentsFut = placementDriver.getAssignments(replicationGroupId, nowTs) - .whenComplete((tokenizedAssignments, ex) -> { - if (ex != null) { - return; - } + for (int p = 0; p < partitions; p++) { + replicationGroupIds.add(new TablePartitionId(table.id(), p)); + } - if (tokenizedAssignments == null) { + return placementDriver.getAssignments(replicationGroupIds, nowTs) + .thenAccept(tokenizedAssignments -> { + for (int p = 0; p < partitions; p++) { + TokenizedAssignments assignment = tokenizedAssignments.get(p); + if (assignment == null) { throw new IllegalStateException("Cannot get assignments for table " + table.name() - + " (replication group=" + replicationGroupId + ")."); + + " (replication group=" + replicationGroupIds.get(p) + ")."); } - List<String> assignments = tokenizedAssignments.nodes().stream() - .map(Assignment::consistentId) - .collect(Collectors.toList()); - - required.addAll(assignments); - }); - - partitionFutures.add(assignmentsFut); - } - - return CompletableFutures.allOf(partitionFutures) + assignment.nodes().forEach(a -> required.add(a.consistentId())); + } + }) .thenCompose(ignore -> collectRequiredNodes(catalog, tabItr, required, nowTs)); } diff --git a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java index 54da3e4648..8f1bdc5835 100644 --- a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java +++ b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java @@ -106,8 +106,8 @@ public class FakePlacementDriver extends AbstractEventProducer<PrimaryReplicaEve } @Override - public CompletableFuture<TokenizedAssignments> getAssignments( - ReplicationGroupId replicationGroupId, + public CompletableFuture<List<TokenizedAssignments>> getAssignments( + List<? extends ReplicationGroupId> replicationGroupIds, HybridTimestamp clusterTimeToAwait ) { return failedFuture(new UnsupportedOperationException("getAssignments() is not supported in FakePlacementDriver yet.")); diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java index fb81f9cee2..f0650073ca 100644 --- a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java +++ b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.index; import static java.util.concurrent.CompletableFuture.failedFuture; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -58,8 +59,8 @@ class TestPlacementDriver extends AbstractEventProducer<PrimaryReplicaEvent, Pri } @Override - public CompletableFuture<TokenizedAssignments> getAssignments( - ReplicationGroupId replicationGroupId, + public CompletableFuture<List<TokenizedAssignments>> getAssignments( + List<? extends ReplicationGroupId> replicationGroupIds, HybridTimestamp clusterTimeToAwait ) { return failedFuture(new UnsupportedOperationException("getAssignments() is not supported in FakePlacementDriver yet.")); diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java index 51b207f66b..aa2c0bcc15 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.partition.replicator.utils; import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.affinity.TokenizedAssignments; @@ -67,8 +68,8 @@ public class TestPlacementDriver extends AbstractEventProducer<PrimaryReplicaEve } @Override - public CompletableFuture<TokenizedAssignments> getAssignments( - ReplicationGroupId replicationGroupId, + public CompletableFuture<List<TokenizedAssignments>> getAssignments( + List<? extends ReplicationGroupId> replicationGroupIds, HybridTimestamp clusterTimeToAwait ) { return failedFuture(new UnsupportedOperationException("getAssignments() is not supported in FakePlacementDriver yet.")); diff --git a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java index 641ab117ec..164ee37d59 100644 --- a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java +++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.placementdriver; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.affinity.TokenizedAssignments; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -39,5 +40,24 @@ public interface AssignmentsPlacementDriver { * @param clusterTimeToAwait Cluster time to await. * @return Tokenized assignments. */ - CompletableFuture<TokenizedAssignments> getAssignments(ReplicationGroupId replicationGroupId, HybridTimestamp clusterTimeToAwait); + default CompletableFuture<TokenizedAssignments> getAssignments( + ReplicationGroupId replicationGroupId, + HybridTimestamp clusterTimeToAwait + ) { + return getAssignments(List.of(replicationGroupId), clusterTimeToAwait).thenApply(assignments -> assignments.get(0)); + } + + /** + * Returns the future with list of tokenized assignments, which contains newest available tokenized assignment for the specified + * replication group id or {@code null} if assignment for the replication group id was not found. The future will be completed after + * clusterTime (meta storage safe time) will become greater or equal to the clusterTimeToAwait parameter. + * + * @param replicationGroupIds List of replication group Ids. + * @param clusterTimeToAwait Cluster time to await. + * @return List of tokenized assignments. + */ + CompletableFuture<List<TokenizedAssignments>> getAssignments( + List<? extends ReplicationGroupId> replicationGroupIds, + HybridTimestamp clusterTimeToAwait + ); } diff --git a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java index 3392cef934..d91e9499ff 100644 --- a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java +++ b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java @@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -85,8 +86,8 @@ public class TestPlacementDriver extends AbstractEventProducer<PrimaryReplicaEve } @Override - public CompletableFuture<TokenizedAssignments> getAssignments( - ReplicationGroupId replicationGroupId, + public CompletableFuture<List<TokenizedAssignments>> getAssignments( + List<? extends ReplicationGroupId> replicationGroupIds, HybridTimestamp clusterTimeToAwait ) { return failedFuture(new UnsupportedOperationException("getAssignments() is not supported in FakePlacementDriver yet.")); diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java index b71a9f7f4c..3f49ca826c 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java @@ -23,6 +23,7 @@ import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.internal.util.StringUtils.incrementLastChar; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -129,14 +130,20 @@ public class AssignmentsTracker implements AssignmentsPlacementDriver { } @Override - public CompletableFuture<TokenizedAssignments> getAssignments( - ReplicationGroupId replicationGroupId, + public CompletableFuture<List<TokenizedAssignments>> getAssignments( + List<? extends ReplicationGroupId> replicationGroupIds, HybridTimestamp clusterTimeToAwait ) { return msManager .clusterTime() .waitFor(clusterTimeToAwait) - .thenApply(ignored -> inBusyLock(busyLock, () -> assignments().get(replicationGroupId))); + .thenApply(ignored -> inBusyLock(busyLock, () -> { + Map<ReplicationGroupId, TokenizedAssignments> assignments = assignments(); + + return replicationGroupIds.stream() + .map(assignments::get) + .collect(Collectors.toList()); + })); } /** diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java index a798c7a72a..e7bae39bd6 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java @@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; +import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -272,11 +273,11 @@ public class PlacementDriverManager implements IgniteComponent { private PlacementDriver createPlacementDriver() { return new PlacementDriver() { @Override - public CompletableFuture<TokenizedAssignments> getAssignments( - ReplicationGroupId replicationGroupId, + public CompletableFuture<List<TokenizedAssignments>> getAssignments( + List<? extends ReplicationGroupId> replicationGroupIds, HybridTimestamp timestamp ) { - return assignmentsTracker.getAssignments(replicationGroupId, timestamp); + return assignmentsTracker.getAssignments(replicationGroupIds, timestamp); } @Override diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java index eca347e990..c5989ec45a 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.table.distributed.wrappers; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.affinity.TokenizedAssignments; @@ -65,10 +66,10 @@ abstract class DelegatingPlacementDriver implements PlacementDriver { } @Override - public CompletableFuture<TokenizedAssignments> getAssignments( - ReplicationGroupId replicationGroupId, + public CompletableFuture<List<TokenizedAssignments>> getAssignments( + List<? extends ReplicationGroupId> replicationGroupIds, HybridTimestamp clusterTimeToAwait ) { - return delegate.getAssignments(replicationGroupId, clusterTimeToAwait); + return delegate.getAssignments(replicationGroupIds, clusterTimeToAwait); } }
