This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-22722
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-22722 by this push:
new 3c31ecf692 Use bulk method for getting assignments for backups.
3c31ecf692 is described below
commit 3c31ecf692def5bdae8202cd5470f8437b500554
Author: amashenkov <[email protected]>
AuthorDate: Thu Aug 8 13:43:20 2024 +0300
Use bulk method for getting assignments for backups.
---
.../sql/engine/ExecutionTargetProviderImpl.java | 55 +++++++++++++++++-----
1 file changed, 44 insertions(+), 11 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
index 3992941bb3..a00f7bdc58 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
@@ -26,6 +26,8 @@ import static
org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -104,15 +106,23 @@ public class ExecutionTargetProviderImpl implements
ExecutionTargetProvider {
) {
int partitions = table.partitions();
+ if (includeBackups) {
+ List<TablePartitionId> replicationGroupIds = new
ArrayList<>(partitions);
+
+ for (int p = 0; p < partitions; p++) {
+ replicationGroupIds.add(new TablePartitionId(table.id(), p));
+ }
+
+ return allReplicas(replicationGroupIds, operationTime);
+ }
+
List<CompletableFuture<TokenizedAssignments>> result = new
ArrayList<>(partitions);
// no need to wait all partitions after pruning was implemented.
for (int partId = 0; partId < partitions; ++partId) {
ReplicationGroupId partGroupId = new TablePartitionId(table.id(),
partId);
- CompletableFuture<TokenizedAssignments> partitionAssignment =
includeBackups
- ? allReplicas(partGroupId, operationTime)
- : primaryReplica(partGroupId, operationTime);
+ CompletableFuture<TokenizedAssignments> partitionAssignment =
primaryReplica(partGroupId, operationTime);
result.add(partitionAssignment);
}
@@ -152,22 +162,45 @@ public class ExecutionTargetProviderImpl implements
ExecutionTargetProvider {
});
}
- private CompletableFuture<TokenizedAssignments> allReplicas(
- ReplicationGroupId replicationGroupId,
+ private CompletableFuture<List<TokenizedAssignments>> allReplicas(
+ List<TablePartitionId> replicationGroupIds,
HybridTimestamp operationTime
) {
- CompletableFuture<TokenizedAssignments> f =
placementDriver.getAssignments(
- replicationGroupId,
+ CompletableFuture<List<TokenizedAssignments>> f =
placementDriver.getAssignments(
+ replicationGroupIds,
operationTime
);
return f.thenCompose(assignments -> {
- if (assignments == null) {
- // assignments are not ready yet, let's fall back to primary
replicas
- return primaryReplica(replicationGroupId, operationTime);
+ // Collect missed assignments indexes if found.
+ IntList missedAssignments = new IntArrayList(0);
+
+ for (int i = 0; i < assignments.size(); i++) {
+ if (assignments.get(i) == null) {
+ missedAssignments.add(i);
+ }
+ }
+
+ if (missedAssignments.isEmpty()) {
+ return completedFuture(assignments);
}
- return completedFuture(assignments);
+ // assignments are not ready yet, let's fall back to primary
replicas
+ List<CompletableFuture<TokenizedAssignments>>
primaryReplicaAssignment = new ArrayList<>(missedAssignments.size());
+
+ for (int i = 0; i < missedAssignments.size(); i++) {
+
primaryReplicaAssignment.add(primaryReplica(replicationGroupIds.get(missedAssignments.getInt(i)),
operationTime));
+ }
+
+ CompletableFuture<Void> all =
CompletableFuture.allOf(primaryReplicaAssignment.toArray(new
CompletableFuture[0]));
+ return all.thenApply(ignore -> {
+ // Replace missed assignments with primary replicas.
+ for (int i = 0; i < missedAssignments.size(); i++) {
+ assignments.set(missedAssignments.getInt(i),
primaryReplicaAssignment.get(i).join());
+ }
+
+ return assignments;
+ });
});
}
}