This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9623c7cd2c IGNITE-24063 Add peers count as parameter into 
DistributionAlgorithm (#4985)
9623c7cd2c is described below

commit 9623c7cd2c60b80e62bb792d583225f8fc6c9c17
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Jan 7 15:18:01 2025 +0300

    IGNITE-24063 Add peers count as parameter into DistributionAlgorithm (#4985)
---
 .../DistributionAlgorithm.java                     |  7 ++--
 .../PartitionDistributionUtils.java                | 17 ++++------
 .../RendezvousDistributionFunction.java            | 39 ++++++++++++----------
 .../RendezvousDistributionFunctionTest.java        | 11 +++---
 .../internal/tx/test/ItTransactionTestUtils.java   |  7 ++--
 5 files changed, 43 insertions(+), 38 deletions(-)

diff --git 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java
 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java
index 6bf518dff1..d3242aa9f6 100644
--- 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java
+++ 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.partitiondistribution;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Partition distribution algorithm.
@@ -32,12 +33,14 @@ public interface DistributionAlgorithm {
      * @param currentDistribution Previous assignments or empty list.
      * @param partitions Number of table partitions.
      * @param replicaFactor Number partition replicas.
+     * @param consensusGroupSize Number of nodes in a consensus group (peers).
      * @return List of nodes by partition.
      */
-    List<List<String>> assignPartitions(
+    List<Set<Assignment>> assignPartitions(
             Collection<String> nodes,
             List<List<String>> currentDistribution,
             int partitions,
-            int replicaFactor
+            int replicaFactor,
+            int consensusGroupSize
     );
 }
diff --git 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java
 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java
index fe7f2c78c8..30bd741467 100644
--- 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java
+++ 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java
@@ -18,8 +18,6 @@
 package org.apache.ignite.internal.partitiondistribution;
 
 import static java.util.Collections.emptyList;
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
 
 import java.util.Collection;
 import java.util.List;
@@ -40,19 +38,19 @@ public class PartitionDistributionUtils {
      * @param replicas Replicas count.
      * @return List assignments by partition.
      */
+    // TODO https://issues.apache.org/jira/browse/IGNITE-24071 pass the 
consensus group size as the parameter here
     public static List<Set<Assignment>> calculateAssignments(
             Collection<String> dataNodes,
             int partitions,
             int replicas
     ) {
-        List<List<String>> nodes = DISTRIBUTION_ALGORITHM.assignPartitions(
+        return DISTRIBUTION_ALGORITHM.assignPartitions(
                 dataNodes,
                 emptyList(),
                 partitions,
+                replicas,
                 replicas
         );
-
-        return 
nodes.stream().map(PartitionDistributionUtils::dataNodesToAssignments).collect(toList());
     }
 
     /**
@@ -64,19 +62,16 @@ public class PartitionDistributionUtils {
      * @param replicas Replicas count.
      * @return Set of assignments.
      */
+    // TODO https://issues.apache.org/jira/browse/IGNITE-24071 pass the 
consensus group size as the parameter here
     public static Set<Assignment> calculateAssignmentForPartition(
             Collection<String> dataNodes,
             int partitionId,
             int partitions,
             int replicas
     ) {
-        List<List<String>> nodes = 
DISTRIBUTION_ALGORITHM.assignPartitions(dataNodes, emptyList(), partitions, 
replicas);
-        List<String> affinityNodes = nodes.get(partitionId);
+        List<Set<Assignment>> assignments = 
DISTRIBUTION_ALGORITHM.assignPartitions(dataNodes, emptyList(), partitions, 
replicas, replicas);
 
-        return dataNodesToAssignments(affinityNodes);
+        return assignments.get(partitionId);
     }
 
-    private static Set<Assignment> dataNodesToAssignments(Collection<String> 
nodes) {
-        return nodes.stream().map(Assignment::forPeer).collect(toSet());
-    }
 }
diff --git 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java
 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java
index 1f3ca4e37f..6b77e325a0 100644
--- 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java
+++ 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java
@@ -30,6 +30,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.function.BiPredicate;
 import java.util.function.IntFunction;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -82,7 +83,7 @@ public class RendezvousDistributionFunction implements 
DistributionAlgorithm {
      * @param aggregator        Function that creates a collection for the 
partition assignments.
      * @return Assignment.
      */
-    public static <T extends Collection<String>> T assignPartition(
+    public static <T extends Collection<Assignment>> T assignPartition(
             int part,
             Collection<String> nodes,
             int replicas,
@@ -94,7 +95,7 @@ public class RendezvousDistributionFunction implements 
DistributionAlgorithm {
         if (nodes.size() <= 1) {
             T res = aggregator.apply(1);
 
-            res.addAll(nodes);
+            nodes.stream().map(Assignment::forPeer).forEach(res::add);
 
             return res;
         }
@@ -125,7 +126,7 @@ public class RendezvousDistributionFunction implements 
DistributionAlgorithm {
 
         String first = it.next();
 
-        res.add(first);
+        res.add(Assignment.forPeer(first));
 
         if (exclNeighbors) {
             allNeighbors.addAll(neighborhoodCache.get(first));
@@ -138,12 +139,12 @@ public class RendezvousDistributionFunction implements 
DistributionAlgorithm {
 
                 if (exclNeighbors) {
                     if (!allNeighbors.contains(node)) {
-                        res.add(node);
+                        res.add(Assignment.forPeer(node));
 
                         allNeighbors.addAll(neighborhoodCache.get(node));
                     }
                 } else if (nodeFilter == null || nodeFilter.test(node, res)) {
-                    res.add(node);
+                    res.add(Assignment.forPeer(node));
                 }
             }
         }
@@ -158,7 +159,7 @@ public class RendezvousDistributionFunction implements 
DistributionAlgorithm {
                 String node = it.next();
 
                 if (!res.contains(node)) {
-                    res.add(node);
+                    res.add(Assignment.forPeer(node));
                 }
             }
 
@@ -183,17 +184,20 @@ public class RendezvousDistributionFunction implements 
DistributionAlgorithm {
      * @param aggregator  Function that creates a collection for the partition 
assignments.
      * @return Assignment.
      */
-    private static <T extends Collection<String>> T 
replicatedAssign(Collection<String> nodes,
-            Iterable<String> sortedNodes, IntFunction<T> aggregator) {
+    private static <T extends Collection<Assignment>> T replicatedAssign(
+            Collection<String> nodes,
+            Iterable<String> sortedNodes,
+            IntFunction<T> aggregator
+    ) {
         String first = sortedNodes.iterator().next();
 
         T res = aggregator.apply(nodes.size());
 
-        res.add(first);
+        res.add(Assignment.forPeer(first));
 
         for (String n : nodes) {
             if (!n.equals(first)) {
-                res.add(n);
+                res.add(Assignment.forPeer(first));
             }
         }
 
@@ -233,16 +237,16 @@ public class RendezvousDistributionFunction implements 
DistributionAlgorithm {
      * @param replicas                Number partition replicas.
      * @param exclNeighbors           If true neighbors are excluded from the 
one partition assignment, false otherwise.
      * @param nodeFilter              Filter for nodes.
-     * @return List nodes by partition.
+     * @return List of assignments by partition.
      */
-    public static List<List<String>> assignPartitions(
+    public static List<Set<Assignment>> assignPartitions(
             Collection<String> currentTopologySnapshot,
             int partitions,
             int replicas,
             boolean exclNeighbors,
-            @Nullable BiPredicate<String, List<String>> nodeFilter
+            @Nullable BiPredicate<String, Set<Assignment>> nodeFilter
     ) {
-        return assignPartitions(currentTopologySnapshot, partitions, replicas, 
exclNeighbors, nodeFilter, ArrayList::new);
+        return assignPartitions(currentTopologySnapshot, partitions, replicas, 
exclNeighbors, nodeFilter, HashSet::new);
     }
 
     /**
@@ -256,7 +260,7 @@ public class RendezvousDistributionFunction implements 
DistributionAlgorithm {
      * @param aggregator              Function that creates a collection for 
the partition assignments.
      * @return List nodes by partition.
      */
-    public static <T extends Collection<String>> List<T> assignPartitions(
+    public static <T extends Collection<Assignment>> List<T> assignPartitions(
             Collection<String> currentTopologySnapshot,
             int partitions,
             int replicas,
@@ -284,11 +288,12 @@ public class RendezvousDistributionFunction implements 
DistributionAlgorithm {
     }
 
     @Override
-    public List<List<String>> assignPartitions(
+    public List<Set<Assignment>> assignPartitions(
             Collection<String> nodes,
             List<List<String>> currentDistribution,
             int partitions,
-            int replicaFactor
+            int replicaFactor,
+            int consensusGroupSize
     ) {
         return assignPartitions(nodes, partitions, replicaFactor, false, null);
     }
diff --git 
a/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunctionTest.java
 
b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunctionTest.java
index 447c0dd0f7..fef7c57bbe 100644
--- 
a/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunctionTest.java
+++ 
b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunctionTest.java
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -52,7 +53,7 @@ public class RendezvousDistributionFunctionTest {
 
         int ideal = (parts * replicas) / nodeCount;
 
-        List<List<String>> assignment = 
RendezvousDistributionFunction.assignPartitions(
+        List<Set<Assignment>> assignment = 
RendezvousDistributionFunction.assignPartitions(
                 nodes,
                 parts,
                 replicas,
@@ -64,12 +65,12 @@ public class RendezvousDistributionFunctionTest {
 
         int part = 0;
 
-        for (List<String> partNodes : assignment) {
-            for (String node : partNodes) {
-                ArrayList<Integer> nodeParts = assignmentByNode.get(node);
+        for (Set<Assignment> partNodes : assignment) {
+            for (Assignment node : partNodes) {
+                ArrayList<Integer> nodeParts = 
assignmentByNode.get(node.consistentId());
 
                 if (nodeParts == null) {
-                    assignmentByNode.put(node, nodeParts = new ArrayList<>());
+                    assignmentByNode.put(node.consistentId(), nodeParts = new 
ArrayList<>());
                 }
 
                 nodeParts.add(part);
diff --git 
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
 
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
index a57e9ce9b0..94db5e60c0 100644
--- 
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
+++ 
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
@@ -128,9 +128,10 @@ public class ItTransactionTestUtils {
         Set<Integer> partitionIds = new HashSet<>();
         Set<String> nodes = new HashSet<>();
 
-        int maxAttempts = 1000;
+        final int maxAttempts = 1000;
+        int attempts = maxAttempts;
 
-        while (maxAttempts >= 0) {
+        while (attempts >= 0) {
             int partId = partitionIdForTuple(node, tableName, t, tx);
             partitionIds.add(partId);
 
@@ -156,7 +157,7 @@ public class ItTransactionTestUtils {
 
             t = nextTuple.apply(t);
 
-            maxAttempts--;
+            attempts--;
         }
 
         throw new AssertionError("Failed to find a suitable tuple, tried " + 
maxAttempts + " times with [partitionIds="

Reply via email to