This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9623c7cd2c IGNITE-24063 Add peers count as parameter into
DistributionAlgorithm (#4985)
9623c7cd2c is described below
commit 9623c7cd2c60b80e62bb792d583225f8fc6c9c17
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Jan 7 15:18:01 2025 +0300
IGNITE-24063 Add peers count as parameter into DistributionAlgorithm (#4985)
---
.../DistributionAlgorithm.java | 7 ++--
.../PartitionDistributionUtils.java | 17 ++++------
.../RendezvousDistributionFunction.java | 39 ++++++++++++----------
.../RendezvousDistributionFunctionTest.java | 11 +++---
.../internal/tx/test/ItTransactionTestUtils.java | 7 ++--
5 files changed, 43 insertions(+), 38 deletions(-)
diff --git
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java
index 6bf518dff1..d3242aa9f6 100644
---
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java
+++
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.partitiondistribution;
import java.util.Collection;
import java.util.List;
+import java.util.Set;
/**
* Partition distribution algorithm.
@@ -32,12 +33,14 @@ public interface DistributionAlgorithm {
* @param currentDistribution Previous assignments or empty list.
* @param partitions Number of table partitions.
* @param replicaFactor Number partition replicas.
+ * @param consensusGroupSize Number of nodes in a consensus group (peers).
* @return List of nodes by partition.
*/
- List<List<String>> assignPartitions(
+ List<Set<Assignment>> assignPartitions(
Collection<String> nodes,
List<List<String>> currentDistribution,
int partitions,
- int replicaFactor
+ int replicaFactor,
+ int consensusGroupSize
);
}
diff --git
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java
index fe7f2c78c8..30bd741467 100644
---
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java
+++
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java
@@ -18,8 +18,6 @@
package org.apache.ignite.internal.partitiondistribution;
import static java.util.Collections.emptyList;
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
import java.util.Collection;
import java.util.List;
@@ -40,19 +38,19 @@ public class PartitionDistributionUtils {
* @param replicas Replicas count.
* @return List assignments by partition.
*/
+ // TODO https://issues.apache.org/jira/browse/IGNITE-24071 pass the
consensus group size as the parameter here
public static List<Set<Assignment>> calculateAssignments(
Collection<String> dataNodes,
int partitions,
int replicas
) {
- List<List<String>> nodes = DISTRIBUTION_ALGORITHM.assignPartitions(
+ return DISTRIBUTION_ALGORITHM.assignPartitions(
dataNodes,
emptyList(),
partitions,
+ replicas,
replicas
);
-
- return
nodes.stream().map(PartitionDistributionUtils::dataNodesToAssignments).collect(toList());
}
/**
@@ -64,19 +62,16 @@ public class PartitionDistributionUtils {
* @param replicas Replicas count.
* @return Set of assignments.
*/
+ // TODO https://issues.apache.org/jira/browse/IGNITE-24071 pass the
consensus group size as the parameter here
public static Set<Assignment> calculateAssignmentForPartition(
Collection<String> dataNodes,
int partitionId,
int partitions,
int replicas
) {
- List<List<String>> nodes =
DISTRIBUTION_ALGORITHM.assignPartitions(dataNodes, emptyList(), partitions,
replicas);
- List<String> affinityNodes = nodes.get(partitionId);
+ List<Set<Assignment>> assignments =
DISTRIBUTION_ALGORITHM.assignPartitions(dataNodes, emptyList(), partitions,
replicas, replicas);
- return dataNodesToAssignments(affinityNodes);
+ return assignments.get(partitionId);
}
- private static Set<Assignment> dataNodesToAssignments(Collection<String>
nodes) {
- return nodes.stream().map(Assignment::forPeer).collect(toSet());
- }
}
diff --git
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java
index 1f3ca4e37f..6b77e325a0 100644
---
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java
+++
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java
@@ -30,6 +30,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Set;
import java.util.function.BiPredicate;
import java.util.function.IntFunction;
import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -82,7 +83,7 @@ public class RendezvousDistributionFunction implements
DistributionAlgorithm {
* @param aggregator Function that creates a collection for the
partition assignments.
* @return Assignment.
*/
- public static <T extends Collection<String>> T assignPartition(
+ public static <T extends Collection<Assignment>> T assignPartition(
int part,
Collection<String> nodes,
int replicas,
@@ -94,7 +95,7 @@ public class RendezvousDistributionFunction implements
DistributionAlgorithm {
if (nodes.size() <= 1) {
T res = aggregator.apply(1);
- res.addAll(nodes);
+ nodes.stream().map(Assignment::forPeer).forEach(res::add);
return res;
}
@@ -125,7 +126,7 @@ public class RendezvousDistributionFunction implements
DistributionAlgorithm {
String first = it.next();
- res.add(first);
+ res.add(Assignment.forPeer(first));
if (exclNeighbors) {
allNeighbors.addAll(neighborhoodCache.get(first));
@@ -138,12 +139,12 @@ public class RendezvousDistributionFunction implements
DistributionAlgorithm {
if (exclNeighbors) {
if (!allNeighbors.contains(node)) {
- res.add(node);
+ res.add(Assignment.forPeer(node));
allNeighbors.addAll(neighborhoodCache.get(node));
}
} else if (nodeFilter == null || nodeFilter.test(node, res)) {
- res.add(node);
+ res.add(Assignment.forPeer(node));
}
}
}
@@ -158,7 +159,7 @@ public class RendezvousDistributionFunction implements
DistributionAlgorithm {
String node = it.next();
if (!res.contains(node)) {
- res.add(node);
+ res.add(Assignment.forPeer(node));
}
}
@@ -183,17 +184,20 @@ public class RendezvousDistributionFunction implements
DistributionAlgorithm {
* @param aggregator Function that creates a collection for the partition
assignments.
* @return Assignment.
*/
- private static <T extends Collection<String>> T
replicatedAssign(Collection<String> nodes,
- Iterable<String> sortedNodes, IntFunction<T> aggregator) {
+ private static <T extends Collection<Assignment>> T replicatedAssign(
+ Collection<String> nodes,
+ Iterable<String> sortedNodes,
+ IntFunction<T> aggregator
+ ) {
String first = sortedNodes.iterator().next();
T res = aggregator.apply(nodes.size());
- res.add(first);
+ res.add(Assignment.forPeer(first));
for (String n : nodes) {
if (!n.equals(first)) {
- res.add(n);
+ res.add(Assignment.forPeer(first));
}
}
@@ -233,16 +237,16 @@ public class RendezvousDistributionFunction implements
DistributionAlgorithm {
* @param replicas Number partition replicas.
* @param exclNeighbors If true neighbors are excluded from the
one partition assignment, false otherwise.
* @param nodeFilter Filter for nodes.
- * @return List nodes by partition.
+ * @return List of assignments by partition.
*/
- public static List<List<String>> assignPartitions(
+ public static List<Set<Assignment>> assignPartitions(
Collection<String> currentTopologySnapshot,
int partitions,
int replicas,
boolean exclNeighbors,
- @Nullable BiPredicate<String, List<String>> nodeFilter
+ @Nullable BiPredicate<String, Set<Assignment>> nodeFilter
) {
- return assignPartitions(currentTopologySnapshot, partitions, replicas,
exclNeighbors, nodeFilter, ArrayList::new);
+ return assignPartitions(currentTopologySnapshot, partitions, replicas,
exclNeighbors, nodeFilter, HashSet::new);
}
/**
@@ -256,7 +260,7 @@ public class RendezvousDistributionFunction implements
DistributionAlgorithm {
* @param aggregator Function that creates a collection for
the partition assignments.
* @return List nodes by partition.
*/
- public static <T extends Collection<String>> List<T> assignPartitions(
+ public static <T extends Collection<Assignment>> List<T> assignPartitions(
Collection<String> currentTopologySnapshot,
int partitions,
int replicas,
@@ -284,11 +288,12 @@ public class RendezvousDistributionFunction implements
DistributionAlgorithm {
}
@Override
- public List<List<String>> assignPartitions(
+ public List<Set<Assignment>> assignPartitions(
Collection<String> nodes,
List<List<String>> currentDistribution,
int partitions,
- int replicaFactor
+ int replicaFactor,
+ int consensusGroupSize
) {
return assignPartitions(nodes, partitions, replicaFactor, false, null);
}
diff --git
a/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunctionTest.java
b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunctionTest.java
index 447c0dd0f7..fef7c57bbe 100644
---
a/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunctionTest.java
+++
b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunctionTest.java
@@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -52,7 +53,7 @@ public class RendezvousDistributionFunctionTest {
int ideal = (parts * replicas) / nodeCount;
- List<List<String>> assignment =
RendezvousDistributionFunction.assignPartitions(
+ List<Set<Assignment>> assignment =
RendezvousDistributionFunction.assignPartitions(
nodes,
parts,
replicas,
@@ -64,12 +65,12 @@ public class RendezvousDistributionFunctionTest {
int part = 0;
- for (List<String> partNodes : assignment) {
- for (String node : partNodes) {
- ArrayList<Integer> nodeParts = assignmentByNode.get(node);
+ for (Set<Assignment> partNodes : assignment) {
+ for (Assignment node : partNodes) {
+ ArrayList<Integer> nodeParts =
assignmentByNode.get(node.consistentId());
if (nodeParts == null) {
- assignmentByNode.put(node, nodeParts = new ArrayList<>());
+ assignmentByNode.put(node.consistentId(), nodeParts = new
ArrayList<>());
}
nodeParts.add(part);
diff --git
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
index a57e9ce9b0..94db5e60c0 100644
---
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
+++
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
@@ -128,9 +128,10 @@ public class ItTransactionTestUtils {
Set<Integer> partitionIds = new HashSet<>();
Set<String> nodes = new HashSet<>();
- int maxAttempts = 1000;
+ final int maxAttempts = 1000;
+ int attempts = maxAttempts;
- while (maxAttempts >= 0) {
+ while (attempts >= 0) {
int partId = partitionIdForTuple(node, tableName, t, tx);
partitionIds.add(partId);
@@ -156,7 +157,7 @@ public class ItTransactionTestUtils {
t = nextTuple.apply(t);
- maxAttempts--;
+ attempts--;
}
throw new AssertionError("Failed to find a suitable tuple, tried " +
maxAttempts + " times with [partitionIds="