This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a7ef1aaad8 IGNITE-24123 Add DistributionAlgorithm interface (#4984)
a7ef1aaad8 is described below
commit a7ef1aaad8e4cc8a075bacf027968f1f006399a9
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Jan 7 14:01:19 2025 +0300
IGNITE-24123 Add DistributionAlgorithm interface (#4984)
---
.../apache/ignite/internal/util/IgniteUtils.java | 16 ++++++++
.../rebalance/ItRebalanceDistributedTest.java | 4 +-
.../distributionzones/rebalance/RebalanceUtil.java | 5 ++-
.../ZoneRebalanceRaftGroupEventsListener.java | 4 +-
.../rebalance/ZoneRebalanceUtil.java | 5 ++-
.../DistributionZoneRebalanceEngineTest.java | 2 +-
.../RebalanceUtilUpdateAssignmentsTest.java | 9 +++--
.../DistributionAlgorithm.java | 43 ++++++++++++++++++++++
.../PartitionDistributionUtils.java | 41 +++++++++++----------
.../RendezvousDistributionFunction.java | 31 +++++++++++-----
.../replicator/ItReplicaLifecycleTest.java | 6 +--
.../PartitionReplicaLifecycleManager.java | 2 +
.../PlacementDriverManagerTest.java | 4 +-
.../internal/placementdriver/LeaseUpdater.java | 5 +++
.../ignite/internal/table/ItTableScanTest.java | 2 +-
.../internal/table/distributed/TableManager.java | 2 +
.../distributed/disaster/GroupUpdateRequest.java | 7 +++-
.../ignite/internal/utils/RebalanceUtilEx.java | 4 +-
.../disaster/DisasterRecoveryMsInvokeTest.java | 4 +-
.../internal/tx/test/ItTransactionTestUtils.java | 14 ++++++-
20 files changed, 158 insertions(+), 52 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 22bac9e011..b259f7b588 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -57,6 +57,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
@@ -1011,6 +1012,21 @@ public class IgniteUtils {
return Optional.empty();
}
+ /**
+ * Iterates over the given collection and applies the given closure to
each element using the collection element and its index.
+ *
+ * @param collection Collection.
+ * @param closure Closure to apply.
+ * @param <T> Type of collection element.
+ */
+ public static <T> void forEachIndexed(Collection<T> collection,
BiConsumer<T, Integer> closure) {
+ int i = 0;
+
+ for (T t : collection) {
+ closure.accept(t, i++);
+ }
+ }
+
/**
* Retries operation until it succeeds or fails with exception that is
different than the given.
*
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index a8c82630bb..5745dfd421 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -943,8 +943,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
dataNodes.add(getNode(i).name);
}
- Set<Assignment> pendingAssignments =
calculateAssignmentForPartition(dataNodes, 0, 2);
- Set<Assignment> plannedAssignments =
calculateAssignmentForPartition(dataNodes, 0, 3);
+ Set<Assignment> pendingAssignments =
calculateAssignmentForPartition(dataNodes, 0, 1, 2);
+ Set<Assignment> plannedAssignments =
calculateAssignmentForPartition(dataNodes, 0, 1, 3);
Node node0 = getNode(0);
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index e90a6d0ef5..c845b17763 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -133,6 +133,7 @@ public class RebalanceUtil {
* @param tableDescriptor Table descriptor.
* @param partId Unique identifier of a partition.
* @param dataNodes Data nodes.
+ * @param partitions Number of partitions.
* @param replicas Number of replicas for a table.
* @param revision Revision of Meta Storage that is specific for the
assignment update.
* @param metaStorageMgr Meta Storage manager.
@@ -144,6 +145,7 @@ public class RebalanceUtil {
CatalogTableDescriptor tableDescriptor,
TablePartitionId partId,
Collection<String> dataNodes,
+ int partitions,
int replicas,
long revision,
MetaStorageManager metaStorageMgr,
@@ -161,7 +163,7 @@ public class RebalanceUtil {
ByteArray partAssignmentsStableKey = stablePartAssignmentsKey(partId);
- Set<Assignment> calculatedAssignments =
calculateAssignmentForPartition(dataNodes, partNum, replicas);
+ Set<Assignment> calculatedAssignments =
calculateAssignmentForPartition(dataNodes, partNum, partitions, replicas);
Set<Assignment> partAssignments;
@@ -375,6 +377,7 @@ public class RebalanceUtil {
tableDescriptor,
replicaGrpId,
dataNodes,
+ zoneDescriptor.partitions(),
zoneDescriptor.replicas(),
storageRevision,
metaStorageManager,
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
index 4e11970984..03b353622b 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
@@ -556,6 +556,7 @@ public class ZoneRebalanceRaftGroupEventsListener
implements RaftGroupEventsList
*
* @param metaStorageMgr MetaStorage manager.
* @param dataNodes Data nodes.
+ * @param partitions Partitions count.
* @param replicas Replicas count.
* @param partId Partition's raft group id.
* @param event Assignments switch reduce change event.
@@ -564,6 +565,7 @@ public class ZoneRebalanceRaftGroupEventsListener
implements RaftGroupEventsList
public static CompletableFuture<Void> handleReduceChanged(
MetaStorageManager metaStorageMgr,
Collection<String> dataNodes,
+ int partitions,
int replicas,
ZonePartitionId partId,
WatchEvent event,
@@ -580,7 +582,7 @@ public class ZoneRebalanceRaftGroupEventsListener
implements RaftGroupEventsList
return nullCompletedFuture();
}
- Set<Assignment> assignments =
calculateAssignmentForPartition(dataNodes, partId.partitionId(), replicas);
+ Set<Assignment> assignments =
calculateAssignmentForPartition(dataNodes, partId.partitionId(), partitions,
replicas);
ByteArray pendingKey = pendingPartAssignmentsKey(partId);
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
index 07f64ea3b2..b0777e3e77 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
@@ -129,6 +129,7 @@ public class ZoneRebalanceUtil {
* @param zoneDescriptor Zone descriptor.
* @param zonePartitionId Unique aggregate identifier of a partition of a
zone.
* @param dataNodes Data nodes.
+ * @param partitions Number of partitions in a zone.
* @param replicas Number of replicas for a zone.
* @param revision Revision of Meta Storage that is specific for the
assignment update.
* @param metaStorageMgr Meta Storage manager.
@@ -141,6 +142,7 @@ public class ZoneRebalanceUtil {
CatalogZoneDescriptor zoneDescriptor,
ZonePartitionId zonePartitionId,
Collection<String> dataNodes,
+ int partitions,
int replicas,
long revision,
MetaStorageManager metaStorageMgr,
@@ -158,7 +160,7 @@ public class ZoneRebalanceUtil {
ByteArray partAssignmentsStableKey =
stablePartAssignmentsKey(zonePartitionId);
- Set<Assignment> calculatedAssignments =
calculateAssignmentForPartition(dataNodes, partNum, replicas);
+ Set<Assignment> calculatedAssignments =
calculateAssignmentForPartition(dataNodes, partNum, partitions, replicas);
Set<Assignment> partAssignments;
@@ -351,6 +353,7 @@ public class ZoneRebalanceUtil {
zoneDescriptor,
replicaGrpId,
dataNodes,
+ zoneDescriptor.partitions(),
zoneDescriptor.replicas(),
storageRevision,
metaStorageManager,
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index de3c1b5b76..f40c9d4c3e 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -515,7 +515,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
if (expectedNodes != null) {
Set<String> expectedAssignments =
- calculateAssignmentForPartition(expectedNodes, j,
zoneDescriptor.replicas())
+ calculateAssignmentForPartition(expectedNodes, j,
zoneDescriptor.partitions(), zoneDescriptor.replicas())
.stream().map(Assignment::consistentId).collect(toSet());
assertNotNull(actualAssignmentsBytes);
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
index d3c53b5258..e44f9687cc 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
@@ -121,10 +121,10 @@ public class RebalanceUtilUpdateAssignmentsTest extends
IgniteAbstractTest {
private static final Set<String> nodes3 = IntStream.of(5).mapToObj(i ->
"nodes3_" + i).collect(toSet());
private static final Set<String> nodes4 = IntStream.of(5).mapToObj(i ->
"nodes4_" + i).collect(toSet());
- private static final Set<Assignment> assignments1 =
calculateAssignmentForPartition(nodes1, partNum, replicas);
- private static final Set<Assignment> assignments2 =
calculateAssignmentForPartition(nodes2, partNum, replicas);
- private static final Set<Assignment> assignments3 =
calculateAssignmentForPartition(nodes3, partNum, replicas);
- private static final Set<Assignment> assignments4 =
calculateAssignmentForPartition(nodes4, partNum, replicas);
+ private static final Set<Assignment> assignments1 =
calculateAssignmentForPartition(nodes1, partNum, partNum + 1, replicas);
+ private static final Set<Assignment> assignments2 =
calculateAssignmentForPartition(nodes2, partNum, partNum + 1, replicas);
+ private static final Set<Assignment> assignments3 =
calculateAssignmentForPartition(nodes3, partNum, partNum + 1, replicas);
+ private static final Set<Assignment> assignments4 =
calculateAssignmentForPartition(nodes4, partNum, partNum + 1, replicas);
private static final long expectedPendingChangeTriggerKey = 10L;
@@ -539,6 +539,7 @@ public class RebalanceUtilUpdateAssignmentsTest extends
IgniteAbstractTest {
tableDescriptor,
tablePartitionId,
nodesForNewAssignments,
+ partNum + 1,
replicas,
expectedPendingChangeTriggerKey,
metaStorageManager,
diff --git
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java
new file mode 100644
index 0000000000..6bf518dff1
--- /dev/null
+++
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/DistributionAlgorithm.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partitiondistribution;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Partition distribution algorithm.
+ */
+public interface DistributionAlgorithm {
+
+ /**
+ * Generates an assignment by the given parameters.
+ *
+ * @param nodes List of topology nodes.
+ * @param currentDistribution Previous assignments or empty list.
+ * @param partitions Number of table partitions.
+ * @param replicaFactor Number partition replicas.
+ * @return List of nodes by partition.
+ */
+ List<List<String>> assignPartitions(
+ Collection<String> nodes,
+ List<List<String>> currentDistribution,
+ int partitions,
+ int replicaFactor
+ );
+}
diff --git
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java
index 694730c982..fe7f2c78c8 100644
---
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java
+++
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/PartitionDistributionUtils.java
@@ -17,12 +17,11 @@
package org.apache.ignite.internal.partitiondistribution;
+import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -30,6 +29,9 @@ import java.util.Set;
* Stateless distribution utils that produces helper methods for an
assignments distribution calculation.
*/
public class PartitionDistributionUtils {
+
+ private static final DistributionAlgorithm DISTRIBUTION_ALGORITHM = new
RendezvousDistributionFunction();
+
/**
* Calculates assignments distribution.
*
@@ -38,14 +40,16 @@ public class PartitionDistributionUtils {
* @param replicas Replicas count.
* @return List assignments by partition.
*/
- public static List<Set<Assignment>>
calculateAssignments(Collection<String> dataNodes, int partitions, int
replicas) {
- List<Set<String>> nodes =
RendezvousDistributionFunction.assignPartitions(
+ public static List<Set<Assignment>> calculateAssignments(
+ Collection<String> dataNodes,
+ int partitions,
+ int replicas
+ ) {
+ List<List<String>> nodes = DISTRIBUTION_ALGORITHM.assignPartitions(
dataNodes,
+ emptyList(),
partitions,
- replicas,
- false,
- null,
- HashSet::new
+ replicas
);
return
nodes.stream().map(PartitionDistributionUtils::dataNodesToAssignments).collect(toList());
@@ -56,21 +60,20 @@ public class PartitionDistributionUtils {
*
* @param dataNodes Data nodes.
* @param partitionId Partition id.
+ * @param partitions Partitions count.
* @param replicas Replicas count.
* @return Set of assignments.
*/
- public static Set<Assignment>
calculateAssignmentForPartition(Collection<String> dataNodes, int partitionId,
int replicas) {
- Set<String> nodes = RendezvousDistributionFunction.assignPartition(
- partitionId,
- new ArrayList<>(dataNodes),
- replicas,
- null,
- false,
- null,
- HashSet::new
- );
+ public static Set<Assignment> calculateAssignmentForPartition(
+ Collection<String> dataNodes,
+ int partitionId,
+ int partitions,
+ int replicas
+ ) {
+ List<List<String>> nodes =
DISTRIBUTION_ALGORITHM.assignPartitions(dataNodes, emptyList(), partitions,
replicas);
+ List<String> affinityNodes = nodes.get(partitionId);
- return dataNodesToAssignments(nodes);
+ return dataNodesToAssignments(affinityNodes);
}
private static Set<Assignment> dataNodesToAssignments(Collection<String>
nodes) {
diff --git
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java
index 0f5e2c99c4..1f3ca4e37f 100644
---
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java
+++
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/RendezvousDistributionFunction.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.partitiondistribution;
+import static org.apache.ignite.internal.util.IgniteUtils.forEachIndexed;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -33,6 +35,7 @@ import java.util.function.IntFunction;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.jetbrains.annotations.Nullable;
/**
* Partition distribution function for partitioned table based on Highest
Random Weight algorithm. This function supports the following
@@ -54,7 +57,7 @@ import org.apache.ignite.internal.logger.Loggers;
* </li>
* </ul>
*/
-public class RendezvousDistributionFunction {
+public class RendezvousDistributionFunction implements DistributionAlgorithm {
/** The logger. */
private static final IgniteLogger LOG =
Loggers.forClass(RendezvousDistributionFunction.class);
@@ -81,11 +84,11 @@ public class RendezvousDistributionFunction {
*/
public static <T extends Collection<String>> T assignPartition(
int part,
- List<String> nodes,
+ Collection<String> nodes,
int replicas,
Map<String, Collection<String>> neighborhoodCache,
boolean exclNeighbors,
- BiPredicate<String, T> nodeFilter,
+ @Nullable BiPredicate<String, T> nodeFilter,
IntFunction<T> aggregator
) {
if (nodes.size() <= 1) {
@@ -99,13 +102,11 @@ public class RendezvousDistributionFunction {
IgniteBiTuple<Long, String>[] hashArr =
(IgniteBiTuple<Long, String>[]) new
IgniteBiTuple[nodes.size()];
- for (int i = 0; i < nodes.size(); i++) {
- String node = nodes.get(i);
-
+ forEachIndexed(nodes, (node, i) -> {
long hash = hash(node.hashCode(), part);
hashArr[i] = new IgniteBiTuple<>(hash, node);
- }
+ });
final int effectiveReplicas = replicas == Integer.MAX_VALUE ?
nodes.size() : Math.min(replicas, nodes.size());
@@ -182,7 +183,7 @@ public class RendezvousDistributionFunction {
* @param aggregator Function that creates a collection for the partition
assignments.
* @return Assignment.
*/
- private static <T extends Collection<String>> T
replicatedAssign(List<String> nodes,
+ private static <T extends Collection<String>> T
replicatedAssign(Collection<String> nodes,
Iterable<String> sortedNodes, IntFunction<T> aggregator) {
String first = sortedNodes.iterator().next();
@@ -239,7 +240,7 @@ public class RendezvousDistributionFunction {
int partitions,
int replicas,
boolean exclNeighbors,
- BiPredicate<String, List<String>> nodeFilter
+ @Nullable BiPredicate<String, List<String>> nodeFilter
) {
return assignPartitions(currentTopologySnapshot, partitions, replicas,
exclNeighbors, nodeFilter, ArrayList::new);
}
@@ -260,7 +261,7 @@ public class RendezvousDistributionFunction {
int partitions,
int replicas,
boolean exclNeighbors,
- BiPredicate<String, T> nodeFilter,
+ @Nullable BiPredicate<String, T> nodeFilter,
IntFunction<T> aggregator
) {
assert partitions <= MAX_PARTITIONS_COUNT : "partitions <= " +
MAX_PARTITIONS_COUNT;
@@ -282,6 +283,16 @@ public class RendezvousDistributionFunction {
return assignments;
}
+ @Override
+ public List<List<String>> assignPartitions(
+ Collection<String> nodes,
+ List<List<String>> currentDistribution,
+ int partitions,
+ int replicaFactor
+ ) {
+ return assignPartitions(nodes, partitions, replicaFactor, false, null);
+ }
+
/**
* Builds neighborhood map for all nodes in snapshot.
*
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index efc7e87d07..749194de65 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -395,7 +395,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
startNodes(testInfo, 3);
Assignment replicaAssignment = (Assignment)
calculateAssignmentForPartition(
- nodes.values().stream().map(n ->
n.name).collect(Collectors.toList()), 0, 1).toArray()[0];
+ nodes.values().stream().map(n ->
n.name).collect(Collectors.toList()), 0, 1, 1).toArray()[0];
Node node = getNode(replicaAssignment.consistentId());
@@ -616,7 +616,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
startNodes(testInfo, 3);
Assignment replicaAssignment = (Assignment)
calculateAssignmentForPartition(
- nodes.values().stream().map(n ->
n.name).collect(Collectors.toList()), 0, 1).toArray()[0];
+ nodes.values().stream().map(n ->
n.name).collect(Collectors.toList()), 0, 1, 1).toArray()[0];
Node node = getNode(replicaAssignment.consistentId());
@@ -649,7 +649,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
startNodes(testInfo, 3);
Assignment replicaAssignment = (Assignment)
calculateAssignmentForPartition(
- nodes.values().stream().map(n ->
n.name).collect(Collectors.toList()), 0, 3).toArray()[0];
+ nodes.values().stream().map(n ->
n.name).collect(Collectors.toList()), 0, 1, 3).toArray()[0];
Node node = getNode(replicaAssignment.consistentId());
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 1e6e8122f9..656b8e6925 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -534,6 +534,7 @@ public class PartitionReplicaLifecycleManager extends
).thenApply(dataNodes -> calculateAssignmentForPartition(
dataNodes,
zonePartitionId.partitionId(),
+ zoneDescriptor.partitions(),
zoneDescriptor.replicas()
)
);
@@ -778,6 +779,7 @@ public class PartitionReplicaLifecycleManager extends
.thenCompose(dataNodes -> handleReduceChanged(
metaStorageMgr,
dataNodes,
+ zoneDescriptor.partitions(),
zoneDescriptor.replicas(),
replicaGrpId,
evt,
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index 31f9cf2f58..7622b4a40a 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -389,7 +389,7 @@ public class PlacementDriverManagerTest extends
BasePlacementDriverTest {
}, 10_000));
- assignments =
calculateAssignmentForPartition(Collections.singleton(nodeName), 1, 1);
+ assignments =
calculateAssignmentForPartition(Collections.singleton(nodeName), 1, 2, 1);
metaStorageManager.put(fromString(STABLE_ASSIGNMENTS_PREFIX +
grpPart0), Assignments.toBytes(assignments, assignmentsTimestamp));
@@ -428,7 +428,7 @@ public class PlacementDriverManagerTest extends
BasePlacementDriverTest {
return falseCompletedFuture();
});
- Set<Assignment> assignments =
calculateAssignmentForPartition(Collections.singleton(anotherNodeName), 1, 1);
+ Set<Assignment> assignments =
calculateAssignmentForPartition(Collections.singleton(anotherNodeName), 1, 2,
1);
metaStorageManager.put(fromString(STABLE_ASSIGNMENTS_PREFIX +
grpPart0), Assignments.toBytes(assignments, assignmentsTimestamp));
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index ac2c92e9cb..880875470e 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -543,6 +543,11 @@ public class LeaseUpdater {
);
}
+ // This condition allows to skip the meta storage invoke when
there are no leases to update (renewedLeases.isEmpty()).
+ // However there is the case when we need to save empty leases
collection: when the assignments are empty and
+ // leasesCurrent (those that reflect the meta storage state) is
not empty. The negation of this condition gives us
+ // the condition to skip the update and the result is:
+ // !(emptyAssignments && !leasesCurrent.isEmpty()) ==
(!emptyAssignments || leasesCurrent.isEmpty())
boolean emptyAssignments =
aggregatedStableAndPendingAssignmentsByGroups.isEmpty();
if (renewedLeases.isEmpty() && (!emptyAssignments ||
leasesCurrent.leaseByGroupId().isEmpty())) {
LOG.debug("No leases to update found.");
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 9a058af9c4..70fefc39f2 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -826,7 +826,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
if (readOnly) {
// Any node from assignments will do it.
Set<Assignment> assignments =
calculateAssignmentForPartition(CLUSTER.aliveNode().clusterNodes().stream().map(
- ClusterNode::name).collect(Collectors.toList()), 0, 1);
+ ClusterNode::name).collect(Collectors.toList()), 0, 1,
1);
assertFalse(assignments.isEmpty());
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 91beb10eec..3fdafb9c4d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1403,6 +1403,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
calculateAssignmentForPartition(
dataNodes,
tablePartitionId.partitionId(),
+ zoneDescriptor.partitions(),
zoneDescriptor.replicas()
)
);
@@ -2490,6 +2491,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
.thenCompose(dataNodes ->
RebalanceUtilEx.handleReduceChanged(
metaStorageMgr,
dataNodes,
+
zoneDescriptor.partitions(),
zoneDescriptor.replicas(),
replicaGrpId,
evt,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
index 085de7dba4..b3afe4904a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
@@ -276,6 +276,7 @@ class GroupUpdateRequest implements DisasterRecoveryRequest
{
replicaGrpId,
aliveDataNodes,
aliveNodesConsistentIds,
+ zoneDescriptor.partitions(),
zoneDescriptor.replicas(),
revision,
metaStorageManager,
@@ -297,6 +298,7 @@ class GroupUpdateRequest implements DisasterRecoveryRequest
{
TablePartitionId partId,
Collection<String> aliveDataNodes,
Set<String> aliveNodesConsistentIds,
+ int partitions,
int replicas,
long revision,
MetaStorageManager metaStorageMgr,
@@ -317,7 +319,7 @@ class GroupUpdateRequest implements DisasterRecoveryRequest
{
}
if (manualUpdate) {
- enrichAssignments(partId, aliveDataNodes, replicas,
partAssignments);
+ enrichAssignments(partId, aliveDataNodes, partitions, replicas,
partAssignments);
}
Assignment nextAssignment =
nextAssignment(localPartitionStateMessageByNode, partAssignments);
@@ -468,10 +470,11 @@ class GroupUpdateRequest implements
DisasterRecoveryRequest {
private static void enrichAssignments(
TablePartitionId partId,
Collection<String> aliveDataNodes,
+ int partitions,
int replicas,
Set<Assignment> partAssignments
) {
- Set<Assignment> calcAssignments =
calculateAssignmentForPartition(aliveDataNodes, partId.partitionId(), replicas);
+ Set<Assignment> calcAssignments =
calculateAssignmentForPartition(aliveDataNodes, partId.partitionId(),
partitions, replicas);
for (Assignment calcAssignment : calcAssignments) {
if (partAssignments.size() == replicas) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java
index 283769bd00..1a3339f770 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java
@@ -110,6 +110,7 @@ public class RebalanceUtilEx {
*
* @param metaStorageMgr MetaStorage manager.
* @param dataNodes Data nodes.
+ * @param partitions Number of partitions.
* @param replicas Replicas count.
* @param partId Partition's raft group id.
* @param event Assignments switch reduce change event.
@@ -118,6 +119,7 @@ public class RebalanceUtilEx {
public static CompletableFuture<Void> handleReduceChanged(
MetaStorageManager metaStorageMgr,
Collection<String> dataNodes,
+ int partitions,
int replicas,
TablePartitionId partId,
WatchEvent event,
@@ -134,7 +136,7 @@ public class RebalanceUtilEx {
return nullCompletedFuture();
}
- Set<Assignment> assignments =
calculateAssignmentForPartition(dataNodes, partId.partitionId(), replicas);
+ Set<Assignment> assignments =
calculateAssignmentForPartition(dataNodes, partId.partitionId(), partitions,
replicas);
ByteArray pendingKey = pendingPartAssignmentsKey(partId);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryMsInvokeTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryMsInvokeTest.java
index ce17ff529e..56649edce2 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryMsInvokeTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryMsInvokeTest.java
@@ -59,8 +59,8 @@ public class DisasterRecoveryMsInvokeTest extends
BaseIgniteAbstractTest {
private static final Set<String> nodes1 = IntStream.of(5).mapToObj(i ->
"nodes1_" + i).collect(toSet());
private static final Set<String> nodes2 = IntStream.of(5).mapToObj(i ->
"nodes2_" + i).collect(toSet());
- private static final Set<Assignment> assignments1 =
calculateAssignmentForPartition(nodes1, partNum, replicas);
- private static final Set<Assignment> assignments2 =
calculateAssignmentForPartition(nodes2, partNum, replicas);
+ private static final Set<Assignment> assignments1 =
calculateAssignmentForPartition(nodes1, partNum, partNum + 1, replicas);
+ private static final Set<Assignment> assignments2 =
calculateAssignmentForPartition(nodes2, partNum, partNum + 1, replicas);
private static final TablePartitionId tablePartitionId = new
TablePartitionId(1, 1);
diff --git
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
index c05dbb03a8..a57e9ce9b0 100644
---
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
+++
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
@@ -26,6 +26,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -124,10 +125,14 @@ public class ItTransactionTestUtils {
Tuple t = initialTuple;
int tableId = tableId(node, tableName);
- int maxAttempts = 100;
+ Set<Integer> partitionIds = new HashSet<>();
+ Set<String> nodes = new HashSet<>();
+
+ int maxAttempts = 1000;
while (maxAttempts >= 0) {
int partId = partitionIdForTuple(node, tableName, t, tx);
+ partitionIds.add(partId);
TablePartitionId grpId = new TablePartitionId(tableId, partId);
@@ -137,12 +142,16 @@ public class ItTransactionTestUtils {
if (node.id().equals(replicaMeta.getLeaseholderId())) {
return t;
}
+
+ nodes.add(replicaMeta.getLeaseholder());
} else {
Set<String> assignments = partitionAssignment(node, grpId);
if (assignments.contains(node.name())) {
return t;
}
+
+ nodes.addAll(assignments);
}
t = nextTuple.apply(t);
@@ -150,7 +159,8 @@ public class ItTransactionTestUtils {
maxAttempts--;
}
- throw new AssertionError("Failed to find a suitable tuple.");
+ throw new AssertionError("Failed to find a suitable tuple, tried " +
maxAttempts + " times with [partitionIds="
+ + partitionIds + ", nodes=" + nodes + "].");
}
/**