This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 30ec21459a IGNITE-18088 Trigger rebalance on zone.dataNodes change. (#1572) 30ec21459a is described below commit 30ec21459ad179646060ed6ff32f4aa346c8a8aa Author: Sergey Uttsel <utt...@gmail.com> AuthorDate: Fri Feb 3 14:51:37 2023 +0300 IGNITE-18088 Trigger rebalance on zone.dataNodes change. (#1572) --- .../ignite/internal/affinity/AffinityUtils.java | 26 +- .../affinity/RendezvousAffinityFunction.java | 109 +++-- .../internal/affinity/AffinityServiceTest.java | 14 +- .../affinity/RendezvousAffinityFunctionTest.java | 37 +- .../distributionzones/DistributionZonesUtil.java | 33 +- .../ignite/internal/rebalance/ItRebalanceTest.java | 246 +++++++++++ modules/table/build.gradle | 3 + .../distributed/ItTxDistributedTestSingleNode.java | 2 +- .../internal/table/distributed/TableManager.java | 148 ++++++- .../ignite/internal/utils/RebalanceUtil.java | 13 +- .../TableManagerDistributionZonesTest.java | 462 +++++++++++++++++++++ .../table/distributed/TableManagerTest.java | 8 +- 12 files changed, 965 insertions(+), 136 deletions(-) diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java index ec15cedc02..cf64ba7d6e 100644 --- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java +++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java @@ -25,7 +25,6 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.ignite.network.ClusterNode; /** * Stateless affinity utils that produces helper methods for an affinity assignments calculation. @@ -34,13 +33,14 @@ public class AffinityUtils { /** * Calculates affinity assignments. * + * @param dataNodes Data nodes. * @param partitions Partitions count. * @param replicas Replicas count. * @return List assignments by partition. */ - public static List<Set<Assignment>> calculateAssignments(Collection<ClusterNode> baselineNodes, int partitions, int replicas) { - List<Set<ClusterNode>> affinityNodes = RendezvousAffinityFunction.assignPartitions( - baselineNodes, + public static List<Set<Assignment>> calculateAssignments(Collection<String> dataNodes, int partitions, int replicas) { + List<Set<String>> affinityNodes = RendezvousAffinityFunction.assignPartitions( + dataNodes, partitions, replicas, false, @@ -48,21 +48,21 @@ public class AffinityUtils { HashSet::new ); - return affinityNodes.stream().map(AffinityUtils::clusterNodesToAssignments).collect(toList()); + return affinityNodes.stream().map(AffinityUtils::dataNodesToAssignments).collect(toList()); } /** * Calculates affinity assignments for a single partition. * - * @param baselineNodes Nodes. + * @param dataNodes Data nodes. * @param partition Partition id. * @param replicas Replicas count. - * @return List of assignments. + * @return Set of assignments. */ - public static Set<Assignment> calculateAssignmentForPartition(Collection<ClusterNode> baselineNodes, int partition, int replicas) { - Set<ClusterNode> affinityNodes = RendezvousAffinityFunction.assignPartition( + public static Set<Assignment> calculateAssignmentForPartition(Collection<String> dataNodes, int partition, int replicas) { + Set<String> affinityNodes = RendezvousAffinityFunction.assignPartition( partition, - new ArrayList<>(baselineNodes), + new ArrayList<>(dataNodes), replicas, null, false, @@ -70,10 +70,10 @@ public class AffinityUtils { HashSet::new ); - return clusterNodesToAssignments(affinityNodes); + return dataNodesToAssignments(affinityNodes); } - private static Set<Assignment> clusterNodesToAssignments(Collection<ClusterNode> nodes) { - return nodes.stream().map(node -> Assignment.forPeer(node.name())).collect(toSet()); + private static Set<Assignment> dataNodesToAssignments(Collection<String> nodes) { + return nodes.stream().map(Assignment::forPeer).collect(toSet()); } } diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java index 91adae62fd..0cc1adf545 100644 --- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java +++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java @@ -33,7 +33,6 @@ import java.util.function.IntFunction; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.network.ClusterNode; /** * Affinity function for partitioned table based on Highest Random Weight algorithm. This function supports the following configuration: @@ -59,7 +58,7 @@ public class RendezvousAffinityFunction { private static final IgniteLogger LOG = Loggers.forClass(RendezvousAffinityFunction.class); /** Comparator. */ - private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator(); + private static final Comparator<IgniteBiTuple<Long, String>> COMPARATOR = new HashComparator(); /** Maximum number of partitions. */ public static final int MAX_PARTITIONS_COUNT = 65000; @@ -67,16 +66,6 @@ public class RendezvousAffinityFunction { /** Exclude neighbors warning. */ private static boolean exclNeighborsWarn; - /** - * Resolves node hash. - * - * @param node Cluster node; - * @return Node hash. - */ - public static Object resolveNodeHash(ClusterNode node) { - return node.name(); - } - /** * Returns collection of nodes for specified partition. * @@ -89,13 +78,13 @@ public class RendezvousAffinityFunction { * @param aggregator Function that creates a collection for the partition assignments. * @return Assignment. */ - public static <T extends Collection<ClusterNode>> T assignPartition( + public static <T extends Collection<String>> T assignPartition( int part, - List<ClusterNode> nodes, + List<String> nodes, int replicas, - Map<String, Collection<ClusterNode>> neighborhoodCache, + Map<String, Collection<String>> neighborhoodCache, boolean exclNeighbors, - BiPredicate<ClusterNode, T> nodeFilter, + BiPredicate<String, T> nodeFilter, IntFunction<T> aggregator ) { if (nodes.size() <= 1) { @@ -106,58 +95,56 @@ public class RendezvousAffinityFunction { return res; } - IgniteBiTuple<Long, ClusterNode>[] hashArr = - (IgniteBiTuple<Long, ClusterNode>[]) new IgniteBiTuple[nodes.size()]; + IgniteBiTuple<Long, String>[] hashArr = + (IgniteBiTuple<Long, String>[]) new IgniteBiTuple[nodes.size()]; for (int i = 0; i < nodes.size(); i++) { - ClusterNode node = nodes.get(i); - - Object nodeHash = resolveNodeHash(node); + String node = nodes.get(i); - long hash = hash(nodeHash.hashCode(), part); + long hash = hash(node.hashCode(), part); hashArr[i] = new IgniteBiTuple<>(hash, node); } final int effectiveReplicas = replicas == Integer.MAX_VALUE ? nodes.size() : Math.min(replicas, nodes.size()); - Iterable<ClusterNode> sortedNodes = new LazyLinearSortedContainer(hashArr, effectiveReplicas); + Iterable<String> sortedNodes = new LazyLinearSortedContainer(hashArr, effectiveReplicas); // REPLICATED cache case if (replicas == Integer.MAX_VALUE) { return replicatedAssign(nodes, sortedNodes, aggregator); } - Iterator<ClusterNode> it = sortedNodes.iterator(); + Iterator<String> it = sortedNodes.iterator(); T res = aggregator.apply(effectiveReplicas); - Collection<ClusterNode> allNeighbors = new HashSet<>(); + Collection<String> allNeighbors = new HashSet<>(); - ClusterNode first = it.next(); + String first = it.next(); res.add(first); if (exclNeighbors) { - allNeighbors.addAll(neighborhoodCache.get(first.id())); + allNeighbors.addAll(neighborhoodCache.get(first)); } // Select another replicas. if (replicas > 1) { while (it.hasNext() && res.size() < effectiveReplicas) { - ClusterNode node = it.next(); + String node = it.next(); if (exclNeighbors) { if (!allNeighbors.contains(node)) { res.add(node); - allNeighbors.addAll(neighborhoodCache.get(node.id())); + allNeighbors.addAll(neighborhoodCache.get(node)); } } else if (nodeFilter == null || nodeFilter.test(node, res)) { res.add(node); if (exclNeighbors) { - allNeighbors.addAll(neighborhoodCache.get(node.id())); + allNeighbors.addAll(neighborhoodCache.get(node)); } } } @@ -170,7 +157,7 @@ public class RendezvousAffinityFunction { it.next(); while (it.hasNext() && res.size() < effectiveReplicas) { - ClusterNode node = it.next(); + String node = it.next(); if (!res.contains(node)) { res.add(node); @@ -198,15 +185,15 @@ public class RendezvousAffinityFunction { * @param aggregator Function that creates a collection for the partition assignments. * @return Assignment. */ - private static <T extends Collection<ClusterNode>> T replicatedAssign(List<ClusterNode> nodes, - Iterable<ClusterNode> sortedNodes, IntFunction<T> aggregator) { - ClusterNode first = sortedNodes.iterator().next(); + private static <T extends Collection<String>> T replicatedAssign(List<String> nodes, + Iterable<String> sortedNodes, IntFunction<T> aggregator) { + String first = sortedNodes.iterator().next(); T res = aggregator.apply(nodes.size()); res.add(first); - for (ClusterNode n : nodes) { + for (String n : nodes) { if (!n.equals(first)) { res.add(n); } @@ -250,12 +237,12 @@ public class RendezvousAffinityFunction { * @param nodeFilter Filter for nodes. * @return List nodes by partition. */ - public static List<List<ClusterNode>> assignPartitions( - Collection<ClusterNode> currentTopologySnapshot, + public static List<List<String>> assignPartitions( + Collection<String> currentTopologySnapshot, int partitions, int replicas, boolean exclNeighbors, - BiPredicate<ClusterNode, List<ClusterNode>> nodeFilter + BiPredicate<String, List<String>> nodeFilter ) { return assignPartitions(currentTopologySnapshot, partitions, replicas, exclNeighbors, nodeFilter, ArrayList::new); } @@ -271,12 +258,12 @@ public class RendezvousAffinityFunction { * @param aggregator Function that creates a collection for the partition assignments. * @return List nodes by partition. */ - public static <T extends Collection<ClusterNode>> List<T> assignPartitions( - Collection<ClusterNode> currentTopologySnapshot, + public static <T extends Collection<String>> List<T> assignPartitions( + Collection<String> currentTopologySnapshot, int partitions, int replicas, boolean exclNeighbors, - BiPredicate<ClusterNode, T> nodeFilter, + BiPredicate<String, T> nodeFilter, IntFunction<T> aggregator ) { assert partitions <= MAX_PARTITIONS_COUNT : "partitions <= " + MAX_PARTITIONS_COUNT; @@ -285,9 +272,9 @@ public class RendezvousAffinityFunction { List<T> assignments = new ArrayList<>(partitions); - Map<String, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ? neighbors(currentTopologySnapshot) : null; + Map<String, Collection<String>> neighborhoodCache = exclNeighbors ? neighbors(currentTopologySnapshot) : null; - List<ClusterNode> nodes = new ArrayList<>(currentTopologySnapshot); + List<String> nodes = new ArrayList<>(currentTopologySnapshot); for (int i = 0; i < partitions; i++) { T partAssignment = assignPartition(i, nodes, replicas, neighborhoodCache, exclNeighbors, nodeFilter, aggregator); @@ -304,15 +291,15 @@ public class RendezvousAffinityFunction { * @param topSnapshot Topology snapshot. * @return Neighbors map. */ - public static Map<String, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) { - Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f); + public static Map<String, Collection<String>> neighbors(Collection<String> topSnapshot) { + Map<String, Collection<String>> macMap = new HashMap<>(topSnapshot.size(), 1.0f); // Group by mac addresses. - for (ClusterNode node : topSnapshot) { + for (String node : topSnapshot) { String macs = String.valueOf(node.hashCode()); //node.attribute(IgniteNodeAttributes.ATTR_MACS); - Collection<ClusterNode> nodes = macMap.get(macs); + Collection<String> nodes = macMap.get(macs); if (nodes == null) { macMap.put(macs, nodes = new HashSet<>()); @@ -321,11 +308,11 @@ public class RendezvousAffinityFunction { nodes.add(node); } - Map<String, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f); + Map<String, Collection<String>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f); - for (Collection<ClusterNode> group : macMap.values()) { - for (ClusterNode node : group) { - neighbors.put(node.id(), group); + for (Collection<String> group : macMap.values()) { + for (String node : group) { + neighbors.put(node, group); } } @@ -335,24 +322,24 @@ public class RendezvousAffinityFunction { /** * Hash comparator. */ - private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable { + private static class HashComparator implements Comparator<IgniteBiTuple<Long, String>>, Serializable { /** Serial version uid. */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ @Override - public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) { + public int compare(IgniteBiTuple<Long, String> o1, IgniteBiTuple<Long, String> o2) { return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 : - o1.get2().name().compareTo(o2.get2().name()); + o1.get2().compareTo(o2.get2()); } } /** * Sorts the initial array with linear sort algorithm array. */ - private static class LazyLinearSortedContainer implements Iterable<ClusterNode> { + private static class LazyLinearSortedContainer implements Iterable<String> { /** Initial node-hash array. */ - private final IgniteBiTuple<Long, ClusterNode>[] arr; + private final IgniteBiTuple<Long, String>[] arr; /** Count of the sorted elements. */ private int sorted; @@ -363,7 +350,7 @@ public class RendezvousAffinityFunction { * @param arr Node / partition hash list. * @param needFirstSortedCnt Estimate count of elements to return by iterator. */ - LazyLinearSortedContainer(IgniteBiTuple<Long, ClusterNode>[] arr, int needFirstSortedCnt) { + LazyLinearSortedContainer(IgniteBiTuple<Long, String>[] arr, int needFirstSortedCnt) { this.arr = arr; if (needFirstSortedCnt > (int) Math.log(arr.length)) { @@ -375,14 +362,14 @@ public class RendezvousAffinityFunction { /** {@inheritDoc} */ @Override - public Iterator<ClusterNode> iterator() { + public Iterator<String> iterator() { return new SortIterator(); } /** * Sorting iterator. */ - private class SortIterator implements Iterator<ClusterNode> { + private class SortIterator implements Iterator<String> { /** Index of the first unsorted element. */ private int cur; @@ -394,7 +381,7 @@ public class RendezvousAffinityFunction { /** {@inheritDoc} */ @Override - public ClusterNode next() { + public String next() { if (!hasNext()) { throw new NoSuchElementException(); } @@ -403,7 +390,7 @@ public class RendezvousAffinityFunction { return arr[cur++].get2(); } - IgniteBiTuple<Long, ClusterNode> min = arr[cur]; + IgniteBiTuple<Long, String> min = arr[cur]; int minIdx = cur; diff --git a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java index 5310ab1dcc..0d0012d4bf 100644 --- a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java +++ b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java @@ -23,9 +23,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.UUID; -import org.apache.ignite.network.ClusterNode; -import org.apache.ignite.network.NetworkAddress; import org.junit.jupiter.api.Test; /** @@ -36,16 +33,7 @@ public class AffinityServiceTest { @Test public void testCalculatedAssignmentHappyPath() { List<Set<Assignment>> assignments = AffinityUtils.calculateAssignments( - Arrays.asList( - new ClusterNode( - UUID.randomUUID().toString(), "node0", - new NetworkAddress("localhost", 8080) - ), - new ClusterNode( - UUID.randomUUID().toString(), "node1", - new NetworkAddress("localhost", 8081) - ) - ), + Arrays.asList("node0", "node1"), 10, 3 ); diff --git a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java index 6866d9a0fb..187bfecd89 100644 --- a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java +++ b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java @@ -27,7 +27,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -35,7 +34,6 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.network.ClusterNode; -import org.apache.ignite.network.NetworkAddress; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; @@ -51,32 +49,32 @@ public class RendezvousAffinityFunctionTest { @Test public void testPartitionDistribution() { - int nodes = 50; + int nodeCount = 50; int parts = 10_000; int replicas = 4; - List<ClusterNode> clusterNodes = prepareNetworkTopology(nodes); + List<String> nodes = prepareNetworkTopology(nodeCount); - assertTrue(parts > nodes, "Partitions should be more that nodes"); + assertTrue(parts > nodeCount, "Partitions should be more than nodes"); - int ideal = (parts * replicas) / nodes; + int ideal = (parts * replicas) / nodeCount; - List<List<ClusterNode>> assignment = RendezvousAffinityFunction.assignPartitions( - clusterNodes, + List<List<String>> assignment = RendezvousAffinityFunction.assignPartitions( + nodes, parts, replicas, false, null ); - HashMap<ClusterNode, ArrayList<Integer>> assignmentByNode = new HashMap<>(nodes); + HashMap<String, ArrayList<Integer>> assignmentByNode = new HashMap<>(nodeCount); int part = 0; - for (List<ClusterNode> partNodes : assignment) { - for (ClusterNode node : partNodes) { + for (List<String> partNodes : assignment) { + for (String node : partNodes) { ArrayList<Integer> nodeParts = assignmentByNode.get(node); if (nodeParts == null) { @@ -89,7 +87,7 @@ public class RendezvousAffinityFunctionTest { part++; } - for (ClusterNode node : clusterNodes) { + for (String node : nodes) { ArrayList<Integer> nodeParts = assignmentByNode.get(node); assertNotNull(nodeParts); @@ -104,29 +102,26 @@ public class RendezvousAffinityFunctionTest { } @NotNull - private List<ClusterNode> prepareNetworkTopology(int nodes) { - var addr = new NetworkAddress("127.0.0.1", 121212); - + private List<String> prepareNetworkTopology(int nodes) { return IntStream.range(0, nodes) .mapToObj(i -> "Node " + i) - .map(name -> new ClusterNode(UUID.randomUUID().toString(), name, addr)) .collect(Collectors.toUnmodifiableList()); } @Test public void serializeAssignment() { - int nodes = 50; + int nodeCount = 50; int parts = 10_000; int replicas = 4; - List<ClusterNode> clusterNodes = prepareNetworkTopology(nodes); + List<String> nodes = prepareNetworkTopology(nodeCount); - assertTrue(parts > nodes, "Partitions should be more that nodes"); + assertTrue(parts > nodeCount, "Partitions should be more than nodes"); - List<List<ClusterNode>> assignment = RendezvousAffinityFunction.assignPartitions( - clusterNodes, + List<List<String>> assignment = RendezvousAffinityFunction.assignPartitions( + nodes, parts, replicas, false, diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java index ea1d8e6177..ac9cb9905d 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java @@ -25,6 +25,7 @@ import static org.apache.ignite.internal.metastorage.dsl.Operations.ops; import static org.apache.ignite.internal.metastorage.dsl.Operations.put; import static org.apache.ignite.internal.metastorage.dsl.Operations.remove; +import java.nio.charset.StandardCharsets; import java.util.Set; import org.apache.ignite.internal.metastorage.dsl.CompoundCondition; import org.apache.ignite.internal.metastorage.dsl.Update; @@ -34,7 +35,7 @@ import org.apache.ignite.lang.ByteArray; /** * Util class for Distribution Zones flow. */ -class DistributionZonesUtil { +public class DistributionZonesUtil { /** Key prefix for zone's data nodes. */ private static final String DISTRIBUTION_ZONE_DATA_NODES_PREFIX = "distributionZone.dataNodes."; @@ -60,11 +61,37 @@ class DistributionZonesUtil { private static final ByteArray DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY = new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION); - /** ByteArray representation of {@link DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}. */ - static ByteArray zoneDataNodesKey(int zoneId) { + /** + * ByteArray representation of {@link DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}. + * + * @param zoneId Zone id. + * @return ByteArray representation. + */ + public static ByteArray zoneDataNodesKey(int zoneId) { return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX + zoneId); } + /** + * ByteArray representation of {@link DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}. + * + * @return ByteArray representation. + */ + public static ByteArray zoneDataNodesPrefix() { + return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX); + } + + /** + * Extract zone id from a distribution zone data nodes key. + * + * @param key Key. + * @return Zone id. + */ + public static int extractZoneId(byte[] key) { + var strKey = new String(key, StandardCharsets.UTF_8); + + return Integer.parseInt(strKey.substring(DISTRIBUTION_ZONE_DATA_NODES_PREFIX.length())); + } + /** * The key needed for processing an event about zone's creation and deletion. * With this key we can be sure that event was triggered only once. diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java new file mode 100644 index 0000000000..665cac7338 --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rebalance; + +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.Cluster.NodeKnockout.PARTITION_NETWORK; +import static org.apache.ignite.internal.SessionUtils.executeUpdate; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.function.BooleanSupplier; +import java.util.stream.Collectors; +import org.apache.ignite.internal.Cluster; +import org.apache.ignite.internal.affinity.Assignment; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException; +import org.apache.ignite.internal.schema.BinaryRowEx; +import org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration; +import org.apache.ignite.internal.schema.configuration.TablesConfiguration; +import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl; +import org.apache.ignite.internal.table.TableImpl; +import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.testframework.WorkDirectory; +import org.apache.ignite.internal.testframework.WorkDirectoryExtension; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.table.Tuple; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Test suite for the rebalance. + */ +@ExtendWith(WorkDirectoryExtension.class) +public class ItRebalanceTest extends BaseIgniteAbstractTest { + private static final IgniteLogger LOG = Loggers.forClass(ItRebalanceTest.class); + + @WorkDirectory + private Path workDir; + + private Cluster cluster; + + @BeforeEach + void createCluster(TestInfo testInfo) { + cluster = new Cluster(testInfo, workDir); + } + + @AfterEach + void shutdownCluster() { + cluster.shutdown(); + } + + /** + * The test checks that data is rebalanced after node with replica is left and joined to the cluster. + * + * @throws Exception If failed. + */ + @Test + @Disabled("https://issues.apache.org/jira/browse/IGNITE-18692") + void assignmentsChangingOnNodeLeaveNodeJoin() throws Exception { + cluster.startAndInit(4); + + //Creates table with 1 partition and 3 replicas. + createTestTable(); + + assertTrue(waitAssignments(List.of( + Set.of(0, 1, 2), + Set.of(0, 1, 2), + Set.of(0, 1, 2), + Set.of(0, 1, 2) + ))); + + TableImpl table = (TableImpl) cluster.node(0).tables().table("TEST"); + + BinaryRowEx row = new TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 1).set("value", "value1")); + BinaryRowEx key = new TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 1)); + + assertNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(0).node()).get()); + assertNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(1).node()).get()); + assertNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(2).node()).get()); + + table.internalTable().insert(row, null).get(); + + assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(0).node()).get()); + assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(1).node()).get()); + assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(2).node()).get()); + + try { + table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(3).node()).get(); + + fail(); + } catch (Exception e) { + assertInstanceOf(ExecutionException.class, e); + + assertInstanceOf(ReplicaUnavailableException.class, e.getCause()); + } + + cluster.knockOutNode(2, PARTITION_NETWORK); + + assertTrue(waitAssignments(List.of( + Set.of(0, 1, 3), + Set.of(0, 1, 3), + Set.of(0, 1, 2), + Set.of(0, 1, 3) + ))); + + assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(0).node()).get()); + assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(1).node()).get()); + assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(3).node()).get()); + + cluster.reanimateNode(2, PARTITION_NETWORK); + + assertTrue(waitAssignments(List.of( + Set.of(0, 1, 2), + Set.of(0, 1, 2), + Set.of(0, 1, 2), + Set.of(0, 1, 2) + ))); + + assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(0).node()).get()); + assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(1).node()).get()); + assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(2).node()).get()); + + try { + table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(3).node()).get(); + + fail(); + } catch (Exception e) { + assertInstanceOf(ExecutionException.class, e); + + assertInstanceOf(ReplicaUnavailableException.class, e.getCause()); + } + } + + /** + * Wait assignments on nodes. + * + * @param nodes Expected assignments. + * @return {@code true} if the expected and actual assignments are the same. + * @throws InterruptedException If interrupted. + */ + private boolean waitAssignments(List<Set<Integer>> nodes) throws InterruptedException { + return waitForCondition(() -> { + for (int i = 0; i < nodes.size(); i++) { + Set<Integer> expectedAssignments = nodes.get(i); + + ExtendedTableConfiguration table = + (ExtendedTableConfiguration) cluster.node(i) + .clusterConfiguration().getConfiguration(TablesConfiguration.KEY).tables().get("TEST"); + + byte[] assignmentsBytes = table.assignments().value(); + + Set<String> assignments; + + if (assignmentsBytes != null) { + assignments = ((List<Set<Assignment>>) ByteUtils.fromBytes(assignmentsBytes)).get(0) + .stream().map(assignment -> assignment.consistentId()).collect(Collectors.toSet()); + } else { + assignments = Collections.emptySet(); + } + + LOG.info("Assignments for node " + i + ": " + assignments); + + if (!(expectedAssignments.size() == assignments.size()) + || !expectedAssignments.stream().allMatch(node -> assignments.contains(cluster.node(node).name()))) { + return false; + } + } + + return true; + }, + 5000); + } + + private void createTestTable() throws InterruptedException { + String sql1 = "create zone test_zone with " + + "data_nodes_auto_adjust_scale_up=0, " + + "data_nodes_auto_adjust_scale_down=0"; + String sql2 = "create table test (id int primary key, value varchar(20))" + + " with partitions=1, replicas=3, primary_zone='TEST_ZONE'"; + + cluster.doInSession(0, session -> { + executeUpdate(sql1, session); + executeUpdate(sql2, session); + }); + + waitForTableToStart(); + } + + private void waitForTableToStart() throws InterruptedException { + // TODO: IGNITE-18203 - remove this wait because when a table creation query is executed, the table must be fully ready. + + BooleanSupplier tableStarted = () -> { + int numberOfStartedRaftNodes = cluster.runningNodes() + .map(ItRebalanceTest::tablePartitionIds) + .mapToInt(List::size) + .sum(); + return numberOfStartedRaftNodes == 3; + }; + + assertTrue(waitForCondition(tableStarted, 10_000), "Did not see all table RAFT nodes started"); + } + + /** + * Returns the IDs of all table partitions that exist on the given node. + */ + private static List<TablePartitionId> tablePartitionIds(IgniteImpl node) { + return node.raftManager().localNodes().stream() + .map(RaftNodeId::groupId) + .filter(TablePartitionId.class::isInstance) + .map(TablePartitionId.class::cast) + .collect(toList()); + } +} diff --git a/modules/table/build.gradle b/modules/table/build.gradle index e2d59f5bc3..54a04a315e 100644 --- a/modules/table/build.gradle +++ b/modules/table/build.gradle @@ -59,6 +59,7 @@ dependencies { testImplementation(testFixtures(project(':ignite-configuration'))) testImplementation(testFixtures(project(':ignite-transactions'))) testImplementation(testFixtures(project(':ignite-storage-api'))) + testImplementation(testFixtures(project(':ignite-metastorage'))) testImplementation libs.mockito.core testImplementation libs.mockito.inline testImplementation libs.mockito.junit @@ -80,6 +81,7 @@ dependencies { testFixturesImplementation(testFixtures(project(':ignite-core'))) testFixturesImplementation(testFixtures(project(':ignite-storage-api'))) testFixturesImplementation(testFixtures(project(':ignite-transactions'))) + testFixturesImplementation(testFixtures(project(':ignite-cluster-management'))) testFixturesImplementation libs.jetbrains.annotations testFixturesImplementation libs.fastutil.core testFixturesImplementation libs.mockito.core @@ -96,6 +98,7 @@ dependencies { integrationTestImplementation(testFixtures(project(':ignite-raft'))) integrationTestImplementation(testFixtures(project(':ignite-storage-api'))) integrationTestImplementation(testFixtures(project(':ignite-transactions'))) + integrationTestImplementation(testFixtures(project(':ignite-cluster-management'))) integrationTestImplementation libs.fastutil.core } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java index 80e0f957ca..7a368f6021 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java @@ -371,7 +371,7 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest { */ private Int2ObjectOpenHashMap<RaftGroupService> startTable(UUID tblId, SchemaDescriptor schemaDescriptor) throws Exception { List<Set<Assignment>> calculatedAssignments = AffinityUtils.calculateAssignments( - cluster.stream().map(node -> node.topologyService().localMember()).collect(toList()), + cluster.stream().map(node -> node.topologyService().localMember().name()).collect(toList()), 1, replicas() ); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index d1a483094f..6bfa418150 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -24,6 +24,8 @@ import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.CompletableFuture.runAsync; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesPrefix; import static org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; @@ -70,6 +72,7 @@ import java.util.function.IntSupplier; import java.util.stream.Stream; import org.apache.ignite.configuration.ConfigurationChangeException; import org.apache.ignite.configuration.ConfigurationProperty; +import org.apache.ignite.configuration.NamedConfigurationTree; import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener; import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent; import org.apache.ignite.internal.affinity.AffinityUtils; @@ -293,6 +296,18 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new TableMessagesFactory(); + /** Meta storage listener for changes in the distribution zones data nodes. */ + private final WatchListener distributionZonesDataNodesListener; + + /** Meta storage listener for pending assignments. */ + private final WatchListener pendingAssignmentsRebalanceListener; + + /** Meta storage listener for stable assignments. */ + private final WatchListener stableAssignmentsRebalanceListener; + + /** Meta storage listener for switch reduce assignments. */ + private final WatchListener assignmentsSwitchRebalanceListener; + /** * Creates a new table manager. * @@ -427,6 +442,14 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp new LinkedBlockingQueue<>(), NamedThreadFactory.create(nodeName, "incoming-raft-snapshot", LOG) ); + + distributionZonesDataNodesListener = createDistributionZonesDataNodesListener(); + + pendingAssignmentsRebalanceListener = createPendingAssignmentsRebalanceListener(); + + stableAssignmentsRebalanceListener = createStableAssignmentsRebalanceListener(); + + assignmentsSwitchRebalanceListener = createAssignmentsSwitchRebalanceListener(); } /** {@inheritDoc} */ @@ -434,7 +457,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp public void start() { tablesCfg.tables().any().replicas().listen(this::onUpdateReplicas); - registerRebalanceListeners(); + // TODO: IGNITE-18694 - Recovery for the case when zones watch listener processed event but assignments were not updated. + metaStorageMgr.registerPrefixWatch(zoneDataNodesPrefix(), distributionZonesDataNodesListener); + + metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX), pendingAssignmentsRebalanceListener); + metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX), stableAssignmentsRebalanceListener); + metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX), assignmentsSwitchRebalanceListener); ((ExtendedTableConfiguration) tablesCfg.tables().any()).assignments().listen(this::onUpdateAssignments); @@ -606,7 +634,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp for (int i = 0; i < partCnt; i++) { TablePartitionId replicaGrpId = new TablePartitionId(((ExtendedTableConfiguration) tblCfg).id().value(), i); - futures[i] = updatePendingAssignmentsKeys(tblCfg.name().value(), replicaGrpId, baselineMgr.nodes(), newReplicas, + futures[i] = updatePendingAssignmentsKeys(tblCfg.name().value(), replicaGrpId, + baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()), newReplicas, replicasCtx.storageRevision(), metaStorageMgr, i); } @@ -953,6 +982,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp return; } + metaStorageMgr.unregisterWatch(distributionZonesDataNodesListener); + + metaStorageMgr.unregisterWatch(pendingAssignmentsRebalanceListener); + metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener); + metaStorageMgr.unregisterWatch(assignmentsSwitchRebalanceListener); + busyLock.block(); Map<UUID, TableImpl> tables = tablesByIdVv.latest(); @@ -1248,7 +1283,11 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp } private Set<Assignment> calculateAssignments(TableConfiguration tableCfg, int partNum) { - return AffinityUtils.calculateAssignmentForPartition(baselineMgr.nodes(), partNum, tableCfg.value().replicas()); + return AffinityUtils.calculateAssignmentForPartition( + baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()), + partNum, + tableCfg.value().replicas() + ); } /** @@ -1308,7 +1347,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp // Affinity assignments calculation. extConfCh.changeAssignments(ByteUtils.toBytes(AffinityUtils.calculateAssignments( - baselineMgr.nodes(), + baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()), tableChange.partitions(), tableChange.replicas()))); }); @@ -1789,10 +1828,79 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp } /** - * Register the new meta storage listener for changes in the rebalance-specific keys. + * Creates meta storage listener for distribution zones data nodes updates. + * + * @return The watch listener. + */ + private WatchListener createDistributionZonesDataNodesListener() { + return new WatchListener() { + @Override + public void onUpdate(WatchEvent evt) { + if (!busyLock.enterBusy()) { + throw new IgniteInternalException(new NodeStoppingException()); + } + + try { + byte[] dataNodesBytes = evt.entryEvent().newEntry().value(); + + if (dataNodesBytes == null) { + //The zone was removed so data nodes was removed too. + return; + } + + NamedConfigurationTree<TableConfiguration, TableView, TableChange> tables = tablesCfg.tables(); + + int zoneId = extractZoneId(evt.entryEvent().newEntry().key()); + + Set<String> dataNodes = ByteUtils.fromBytes(dataNodesBytes); + + for (int i = 0; i < tables.value().size(); i++) { + TableView tableView = tables.value().get(i); + + int tableZoneId = tableView.zoneId(); + + if (zoneId == tableZoneId) { + TableConfiguration tableCfg = tables.get(tableView.name()); + + for (int part = 0; part < tableView.partitions(); part++) { + UUID tableId = ((ExtendedTableConfiguration) tableCfg).id().value(); + + TablePartitionId replicaGrpId = new TablePartitionId(tableId, part); + + int partId = part; + + updatePendingAssignmentsKeys( + tableView.name(), replicaGrpId, dataNodes, tableView.replicas(), + evt.entryEvent().newEntry().revision(), metaStorageMgr, part + ).exceptionally(e -> { + LOG.error( + "Exception on updating assignments for [table={}, partition={}]", e, tableView.name(), partId + ); + + return null; + }); + } + } + } + } finally { + busyLock.leaveBusy(); + } + } + + @Override + public void onError(Throwable e) { + LOG.warn("Unable to process data nodes event", e); + } + }; + } + + /** + * Creates meta storage listener for pending assignments updates. + * + * @return The watch listener. */ - private void registerRebalanceListeners() { - metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX), new WatchListener() { + private WatchListener createPendingAssignmentsRebalanceListener() { + return new WatchListener() { @Override public void onUpdate(WatchEvent evt) { if (!busyLock.enterBusy()) { @@ -1960,9 +2068,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp public void onError(Throwable e) { LOG.warn("Unable to process pending assignments event", e); } - }); + }; + } - metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX), new WatchListener() { + /** + * Creates meta storage listener for stable assignments updates. + * + * @return The watch listener. + */ + private WatchListener createStableAssignmentsRebalanceListener() { + return new WatchListener() { @Override public void onUpdate(WatchEvent evt) { handleChangeStableAssignmentEvent(evt); @@ -1972,9 +2087,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp public void onError(Throwable e) { LOG.warn("Unable to process stable assignments event", e); } - }); + }; + } - metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX), new WatchListener() { + /** + * Creates meta storage listener for switch reduce assignments updates. + * + * @return The watch listener. + */ + private WatchListener createAssignmentsSwitchRebalanceListener() { + return new WatchListener() { @Override public void onUpdate(WatchEvent evt) { byte[] key = evt.entryEvent().newEntry().key(); @@ -1990,7 +2112,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp RebalanceUtil.handleReduceChanged( metaStorageMgr, - baselineMgr.nodes(), + baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()), tblCfg.value().replicas(), partitionNumber, replicaGrpId, @@ -2002,7 +2124,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp public void onError(Throwable e) { LOG.warn("Unable to process switch reduce event", e); } - }); + }; } private PartitionMover createPartitionMover(InternalTable internalTable, int partId) { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java index 0b0b5f36c4..a3dea5ffa2 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java @@ -46,7 +46,6 @@ import org.apache.ignite.internal.metastorage.dsl.Operations; import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.lang.ByteArray; -import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.NotNull; /** @@ -82,14 +81,14 @@ public class RebalanceUtil { * * @param tableName Table name. * @param partId Unique identifier of a partition. - * @param baselineNodes Nodes in baseline. + * @param dataNodes Data nodes. * @param replicas Number of replicas for a table. * @param revision Revision of Meta Storage that is specific for the assignment update. * @param metaStorageMgr Meta Storage manager. * @return Future representing result of updating keys in {@code metaStorageMgr} */ public static @NotNull CompletableFuture<Void> updatePendingAssignmentsKeys( - String tableName, TablePartitionId partId, Collection<ClusterNode> baselineNodes, + String tableName, TablePartitionId partId, Collection<String> dataNodes, int replicas, long revision, MetaStorageManager metaStorageMgr, int partNum) { ByteArray partChangeTriggerKey = partChangeTriggerKey(partId); @@ -99,7 +98,7 @@ public class RebalanceUtil { ByteArray partAssignmentsStableKey = stablePartAssignmentsKey(partId); - Set<Assignment> partAssignments = AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas); + Set<Assignment> partAssignments = AffinityUtils.calculateAssignmentForPartition(dataNodes, partNum, replicas); byte[] partAssignmentsBytes = ByteUtils.toBytes(partAssignments); @@ -341,14 +340,14 @@ public class RebalanceUtil { * If there is rebalancing in progress, then new assignments will be applied when rebalance finishes. * * @param metaStorageMgr MetaStorage manager. - * @param baselineNodes Baseline nodes. + * @param dataNodes Data nodes. * @param replicas Replicas count. * @param partNum Number of the partition. * @param partId Partition's raft group id. * @param event Assignments switch reduce change event. * @return Completable future that signifies the completion of this operation. */ - public static CompletableFuture<Void> handleReduceChanged(MetaStorageManager metaStorageMgr, Collection<ClusterNode> baselineNodes, + public static CompletableFuture<Void> handleReduceChanged(MetaStorageManager metaStorageMgr, Collection<String> dataNodes, int replicas, int partNum, TablePartitionId partId, WatchEvent event) { Entry entry = event.entryEvent().newEntry(); byte[] eventData = entry.value(); @@ -359,7 +358,7 @@ public class RebalanceUtil { return CompletableFuture.completedFuture(null); } - Set<Assignment> assignments = AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas); + Set<Assignment> assignments = AffinityUtils.calculateAssignmentForPartition(dataNodes, partNum, replicas); ByteArray pendingKey = pendingPartAssignmentsKey(partId); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java new file mode 100644 index 0000000000..1645d99ebe --- /dev/null +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed; + +import static java.util.Collections.emptySet; +import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED; +import static org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey; +import static org.apache.ignite.internal.util.ByteUtils.fromBytes; +import static org.apache.ignite.internal.util.ByteUtils.toBytes; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.ignite.configuration.ConfigurationValue; +import org.apache.ignite.configuration.NamedConfigurationTree; +import org.apache.ignite.configuration.NamedListView; +import org.apache.ignite.internal.affinity.Assignment; +import org.apache.ignite.internal.configuration.ConfigurationManager; +import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration; +import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.EntryEvent; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.WatchEvent; +import org.apache.ignite.internal.metastorage.WatchListener; +import org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory; +import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand; +import org.apache.ignite.internal.metastorage.dsl.Iif; +import org.apache.ignite.internal.metastorage.impl.EntryImpl; +import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; +import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; +import org.apache.ignite.internal.raft.Command; +import org.apache.ignite.internal.raft.WriteCommand; +import org.apache.ignite.internal.raft.service.CommandClosure; +import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.schema.SchemaManager; +import org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration; +import org.apache.ignite.internal.schema.configuration.TableChange; +import org.apache.ignite.internal.schema.configuration.TableConfiguration; +import org.apache.ignite.internal.schema.configuration.TableView; +import org.apache.ignite.internal.schema.configuration.TablesConfiguration; +import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; +import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.utils.RebalanceUtil; +import org.apache.ignite.lang.ByteArray; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteInternalException; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessagingService; +import org.apache.ignite.network.TopologyService; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +/** + * Tests the distribution zone watch listener in {@link TableManager}. + */ +@ExtendWith({MockitoExtension.class, ConfigurationExtension.class}) +@MockitoSettings(strictness = Strictness.LENIENT) +public class TableManagerDistributionZonesTest extends IgniteAbstractTest { + private SimpleInMemoryKeyValueStorage keyValueStorage; + + private ConfigurationManager clusterCfgMgr; + + @Mock() + private ClusterService clusterService; + + private TablesConfiguration tablesConfiguration; + + private WatchListener watchListener; + + private TableManager tableManager; + + @BeforeEach + public void setUp() { + clusterCfgMgr = new ConfigurationManager( + List.of(DistributionZonesConfiguration.KEY), + Set.of(), + new TestConfigurationStorage(DISTRIBUTED), + List.of(), + List.of() + ); + + MetaStorageManager metaStorageManager = mock(MetaStorageManager.class); + + doAnswer(invocation -> { + WatchListener listener = invocation.getArgument(1); + + if (watchListener == null) { + watchListener = listener; + } + + return null; + }).when(metaStorageManager).registerPrefixWatch(any(), any()); + + tablesConfiguration = mock(TablesConfiguration.class); + + clusterCfgMgr.start(); + + AtomicLong raftIndex = new AtomicLong(); + + keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test")); + + MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage); + + RaftGroupService metaStorageService = mock(RaftGroupService.class); + + // Delegate directly to listener. + lenient().doAnswer( + invocationClose -> { + Command cmd = invocationClose.getArgument(0); + + long commandIndex = raftIndex.incrementAndGet(); + + CompletableFuture<Serializable> res = new CompletableFuture<>(); + + CommandClosure<WriteCommand> clo = new CommandClosure<>() { + /** {@inheritDoc} */ + @Override + public long index() { + return commandIndex; + } + + /** {@inheritDoc} */ + @Override + public WriteCommand command() { + return (WriteCommand) cmd; + } + + /** {@inheritDoc} */ + @Override + public void result(@Nullable Serializable r) { + if (r instanceof Throwable) { + res.completeExceptionally((Throwable) r); + } else { + res.complete(r); + } + } + }; + + try { + metaStorageListener.onWrite(List.of(clo).iterator()); + } catch (Throwable e) { + res.completeExceptionally(new IgniteInternalException(e)); + } + + return res; + } + ).when(metaStorageService).run(any()); + + MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory(); + + lenient().doAnswer(invocationClose -> { + Iif iif = invocationClose.getArgument(0); + + MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand().iif(iif).build(); + + return metaStorageService.run(multiInvokeCommand); + }).when(metaStorageManager).invoke(any()); + + when(clusterService.messagingService()).thenAnswer(invocation -> { + MessagingService ret = mock(MessagingService.class); + + return ret; + }); + + tableManager = new TableManager( + "node1", + (x) -> {}, + tablesConfiguration, + clusterService, + null, + null, + null, + null, + null, + mock(TopologyService.class), + null, + null, + null, + metaStorageManager, + mock(SchemaManager.class), + null, + null, + mock(OutgoingSnapshotsManager.class) + ); + } + + @Test + void dataNodesTriggersAssignmentsChanging() { + IgniteBiTuple<TableView, ExtendedTableConfiguration> table0 = mockTable(0, 1, 0); + IgniteBiTuple<TableView, ExtendedTableConfiguration> table1 = mockTable(1, 2, 0); + IgniteBiTuple<TableView, ExtendedTableConfiguration> table2 = mockTable(2, 1, 1); + IgniteBiTuple<TableView, ExtendedTableConfiguration> table3 = mockTable(3, 2, 1); + IgniteBiTuple<TableView, ExtendedTableConfiguration> table4 = mockTable(4, 1, 1); + IgniteBiTuple<TableView, ExtendedTableConfiguration> table5 = mockTable(5, 2, 1); + + List<IgniteBiTuple<TableView, ExtendedTableConfiguration>> mockedTables = + List.of(table0, table1, table2, table3, table4, table5); + + mockTables(mockedTables); + + tableManager.start(); + + Set<String> nodes = Set.of("node0", "node1", "node2"); + + watchListenerOnUpdate(1, nodes, 1); + + Map<Integer, Set<String>> zoneNodes = new HashMap<>(); + + zoneNodes.put(1, nodes); + + checkAssignments(mockedTables, zoneNodes, RebalanceUtil::pendingPartAssignmentsKey); + + verify(keyValueStorage, timeout(1000).times(6)).invoke(any()); + } + + @Test + void sequentialAssignmentsChanging() { + IgniteBiTuple<TableView, ExtendedTableConfiguration> table = mockTable(0, 1, 1); + + List<IgniteBiTuple<TableView, ExtendedTableConfiguration>> mockedTables = List.of(table); + + mockTables(mockedTables); + + tableManager.start(); + + Set<String> nodes = Set.of("node0", "node1", "node2"); + + watchListenerOnUpdate(1, nodes, 1); + + Map<Integer, Set<String>> zoneNodes = new HashMap<>(); + + zoneNodes.put(1, nodes); + + checkAssignments(mockedTables, zoneNodes, RebalanceUtil::pendingPartAssignmentsKey); + + verify(keyValueStorage, timeout(1000).times(1)).invoke(any()); + + nodes = Set.of("node3", "node4", "node5"); + + watchListenerOnUpdate(1, nodes, 2); + + zoneNodes.clear(); + zoneNodes.put(1, nodes); + + checkAssignments(mockedTables, zoneNodes, RebalanceUtil::plannedPartAssignmentsKey); + + verify(keyValueStorage, timeout(1000).times(2)).invoke(any()); + } + + @Test + void sequentialEmptyAssignmentsChanging() { + IgniteBiTuple<TableView, ExtendedTableConfiguration> table = mockTable(0, 1, 1); + + List<IgniteBiTuple<TableView, ExtendedTableConfiguration>> mockedTables = List.of(table); + + mockTables(mockedTables); + + tableManager.start(); + + watchListenerOnUpdate(1, null, 1); + + Set<String> nodes = Set.of("node0", "node1", "node2"); + + watchListenerOnUpdate(1, nodes, 2); + + Map<Integer, Set<String>> zoneNodes = new HashMap<>(); + + zoneNodes.put(1, nodes); + + checkAssignments(mockedTables, zoneNodes, RebalanceUtil::pendingPartAssignmentsKey); + + verify(keyValueStorage, timeout(1000).times(1)).invoke(any()); + + nodes = emptySet(); + + watchListenerOnUpdate(1, nodes, 3); + + zoneNodes.clear(); + zoneNodes.put(1, nodes); + + checkAssignments(mockedTables, zoneNodes, RebalanceUtil::plannedPartAssignmentsKey); + + verify(keyValueStorage, timeout(1000).times(2)).invoke(any()); + } + + @Test + void staleDataNodesEvent() { + IgniteBiTuple<TableView, ExtendedTableConfiguration> table = mockTable(0, 1, 1); + + List<IgniteBiTuple<TableView, ExtendedTableConfiguration>> mockedTables = List.of(table); + + mockTables(mockedTables); + + tableManager.start(); + + Set<String> nodes = Set.of("node0", "node1", "node2"); + + watchListenerOnUpdate(1, nodes, 1); + + Map<Integer, Set<String>> zoneNodes = new HashMap<>(); + + zoneNodes.put(1, nodes); + + checkAssignments(mockedTables, zoneNodes, RebalanceUtil::pendingPartAssignmentsKey); + + verify(keyValueStorage, timeout(1000).times(1)).invoke(any()); + + Set<String> nodes2 = Set.of("node3", "node4", "node5"); + + watchListenerOnUpdate(1, nodes2, 1); + + checkAssignments(mockedTables, zoneNodes, RebalanceUtil::pendingPartAssignmentsKey); + + TablePartitionId partId = new TablePartitionId(new UUID(0, 0), 0); + + assertNull(keyValueStorage.get(RebalanceUtil.plannedPartAssignmentsKey(partId).bytes()).value()); + + verify(keyValueStorage, timeout(1000).times(2)).invoke(any()); + } + + private void checkAssignments( + List<IgniteBiTuple<TableView, ExtendedTableConfiguration>> mockedTables, + Map<Integer, Set<String>> zoneNodes, + Function<TablePartitionId, ByteArray> assignmentFunction + ) { + for (int i = 0; i < mockedTables.size(); i++) { + TableView tableView = mockedTables.get(i).get1(); + + for (int j = 0; j < tableView.partitions(); j++) { + TablePartitionId partId = new TablePartitionId(new UUID(0, i), j); + + byte[] actualAssignmentsBytes = keyValueStorage.get(assignmentFunction.apply(partId).bytes()).value(); + + Set<String> expectedNodes = zoneNodes.get(tableView.zoneId()); + + if (expectedNodes != null) { + Set<String> expectedAssignments = + calculateAssignmentForPartition(expectedNodes, j, tableView.replicas()) + .stream().map(assignment -> assignment.consistentId()).collect(Collectors.toSet()); + + assertNotNull(actualAssignmentsBytes); + + Set<String> actualAssignments = ((Set<Assignment>) fromBytes(actualAssignmentsBytes)) + .stream().map(assignment -> assignment.consistentId()).collect(Collectors.toSet()); + + assertTrue(expectedAssignments.containsAll(actualAssignments)); + + assertEquals(expectedAssignments.size(), actualAssignments.size()); + } else { + assertNull(actualAssignmentsBytes); + } + } + } + } + + private void watchListenerOnUpdate(int zoneId, Set<String> nodes, long rev) { + byte[] newLogicalTopology; + + if (nodes != null) { + newLogicalTopology = toBytes(nodes); + } else { + newLogicalTopology = null; + } + + Entry newEntry = new EntryImpl(zoneDataNodesKey(zoneId).bytes(), newLogicalTopology, rev, 1); + + EntryEvent entryEvent = new EntryEvent(null, newEntry); + + WatchEvent evt = new WatchEvent(entryEvent); + + watchListener.onUpdate(evt); + } + + private IgniteBiTuple<TableView, ExtendedTableConfiguration> mockTable(int tableNum, int partNum, int zoneId) { + TableView tableView = mock(TableView.class); + when(tableView.zoneId()).thenReturn(zoneId); + when(tableView.name()).thenReturn("table" + tableNum); + when(tableView.replicas()).thenReturn(1); + when(tableView.partitions()).thenReturn(partNum); + + ExtendedTableConfiguration tableCfg = mock(ExtendedTableConfiguration.class); + ConfigurationValue valueId = mock(ConfigurationValue.class); + when(valueId.value()).thenReturn(new UUID(0, tableNum)); + when(tableCfg.id()).thenReturn(valueId); + + return new IgniteBiTuple<>(tableView, tableCfg); + + } + + private void mockTables(List<IgniteBiTuple<TableView, ExtendedTableConfiguration>> mockedTables) { + NamedListView<TableView> valueList = mock(NamedListView.class); + + when(valueList.namedListKeys()).thenReturn(new ArrayList<>()); + + NamedConfigurationTree<TableConfiguration, TableView, TableChange> tables = mock(NamedConfigurationTree.class); + + when(valueList.size()).thenReturn(mockedTables.size()); + when(tables.value()).thenReturn(valueList); + + for (int i = 0; i < mockedTables.size(); i++) { + IgniteBiTuple<TableView, ExtendedTableConfiguration> mockedTable = mockedTables.get(i); + + TableView tableView = mockedTable.get1(); + + when(valueList.get(i)).thenReturn(tableView); + + when(tables.get(tableView.name())).thenReturn(mockedTable.get2()); + } + + ExtendedTableConfiguration tableCfg = mock(ExtendedTableConfiguration.class); + when(tables.any()).thenReturn(tableCfg); + when(tableCfg.replicas()).thenReturn(mock(ConfigurationValue.class)); + when(tableCfg.assignments()).thenReturn(mock(ConfigurationValue.class)); + + when(tablesConfiguration.tables()).thenReturn(tables); + } +} diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index a4520d5f1c..64e5fa0708 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -255,6 +255,8 @@ public class TableManagerTest extends IgniteAbstractTest { public void testPreconfiguredTable() throws Exception { when(rm.startRaftGroupService(any(), any())).thenAnswer(mock -> completedFuture(mock(RaftGroupService.class))); + mockMetastore(); + TableManager tableManager = createTableManager(tblManagerFut, false); tblManagerFut.complete(tableManager); @@ -264,8 +266,6 @@ public class TableManagerTest extends IgniteAbstractTest { SchemaBuilders.column("val", ColumnType.INT64).asNullable(true).build() ).withPrimaryKey("key").build(); - mockMetastore(); - tblsCfg.tables().change(tablesChange -> { tablesChange.create(scmTbl.name(), tableChange -> { (SchemaConfigurationConverter.convert(scmTbl, tableChange)) @@ -650,6 +650,8 @@ public class TableManagerTest extends IgniteAbstractTest { .thenReturn(assignment); } + mockMetastore(); + TableManager tableManager = createTableManager(tblManagerFut, true); final int tablesBeforeCreation = tableManager.tables().size(); @@ -687,8 +689,6 @@ public class TableManagerTest extends IgniteAbstractTest { .changePartitions(PARTITIONS) ); - mockMetastore(); - assertTrue(createTblLatch.await(10, TimeUnit.SECONDS)); TableImpl tbl2 = (TableImpl) tbl2Fut.get();