This is an automated email from the ASF dual-hosted git repository. sdanilov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 2405febe72 IGNITE-18240 Store logical topology as a single KV entry instead of a number of entries (#1371) 2405febe72 is described below commit 2405febe72fd295a1044bcdb6787b7f6a8389d7a Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Thu Nov 24 17:36:06 2022 +0400 IGNITE-18240 Store logical topology as a single KV entry instead of a number of entries (#1371) --- .../management/raft/RaftStorageManager.java | 128 ++++++++++++++------- .../raft/commands/NodesLeaveCommand.java | 2 +- .../AbstractClusterStateStorageManagerTest.java | 39 +++++++ 3 files changed, 126 insertions(+), 43 deletions(-) diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java index f3c94cbfea..77c79da236 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java @@ -18,20 +18,25 @@ package org.apache.ignite.internal.cluster.management.raft; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Collections.emptySet; +import static java.util.function.Function.identity; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; import static org.apache.ignite.internal.util.ByteUtils.fromBytes; import static org.apache.ignite.internal.util.ByteUtils.toBytes; +import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.file.Path; -import java.util.Arrays; import java.util.Collection; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.cluster.management.ClusterState; +import org.apache.ignite.internal.tostring.IgniteToStringInclude; +import org.apache.ignite.internal.tostring.S; import org.apache.ignite.internal.util.Cursor; -import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.Nullable; @@ -42,16 +47,20 @@ class RaftStorageManager { /** Storage key for the CMG state. */ private static final byte[] CMG_STATE_KEY = "cmg_state".getBytes(UTF_8); - /** Prefix for the keys for logical topology nodes. */ - private static final byte[] LOGICAL_TOPOLOGY_PREFIX = "logical_".getBytes(UTF_8); + /** Storage key for the logical topology. */ + private static final byte[] LOGICAL_TOPOLOGY_KEY = "logical".getBytes(UTF_8); /** Prefix for validation tokens. */ private static final byte[] VALIDATED_NODE_PREFIX = "validation_".getBytes(UTF_8); private final ClusterStateStorage storage; + private volatile LogicalTopology currentLogicalTopology; + RaftStorageManager(ClusterStateStorage storage) { this.storage = storage; + + currentLogicalTopology = readLogicalTopology(); } /** @@ -79,11 +88,13 @@ class RaftStorageManager { * Retrieves the current logical topology. */ Collection<ClusterNode> getLogicalTopology() { - try (Cursor<ClusterNode> cursor = storage.getWithPrefix(LOGICAL_TOPOLOGY_PREFIX, (k, v) -> fromBytes(v))) { - return cursor.stream().collect(toList()); - } catch (Exception e) { - throw new IgniteInternalException("Unable to get data from storage", e); - } + return currentLogicalTopology.nodes; + } + + private LogicalTopology readLogicalTopology() { + byte[] bytes = storage.get(LOGICAL_TOPOLOGY_KEY); + + return bytes == null ? LogicalTopology.INITIAL : fromBytes(bytes); } /** @@ -92,16 +103,17 @@ class RaftStorageManager { * @param node Node to save. */ void putLogicalTopologyNode(ClusterNode node) { - byte[] nodeNameBytes = node.name().getBytes(UTF_8); - - byte[] nodeIdBytes = node.id().getBytes(UTF_8); + replaceLogicalTopologyWith(currentLogicalTopology.addNode(node)); + } - byte[] key = logicalTopologyKey(nodeNameBytes, nodeIdBytes); + private void replaceLogicalTopologyWith(LogicalTopology newTopology) { + if (newTopology == currentLogicalTopology) { + return; + } - // Replace all nodes with the same consistent ID. - byte[] prefix = Arrays.copyOf(key, key.length - nodeIdBytes.length); + storage.put(LOGICAL_TOPOLOGY_KEY, toBytes(newTopology)); - storage.replaceAll(prefix, key, toBytes(node)); + currentLogicalTopology = newTopology; } /** @@ -110,36 +122,14 @@ class RaftStorageManager { * @param nodes Nodes to remove. */ void removeLogicalTopologyNodes(Set<ClusterNode> nodes) { - Collection<byte[]> keys = nodes.stream() - .map(RaftStorageManager::logicalTopologyKey) - .collect(toList()); - - storage.removeAll(keys); + replaceLogicalTopologyWith(currentLogicalTopology.removeNodesByIds(nodes)); } /** * Returns {@code true} if a given node is present in the logical topology or {@code false} otherwise. */ boolean isNodeInLogicalTopology(ClusterNode node) { - byte[] value = storage.get(logicalTopologyKey(node)); - - return value != null; - } - - private static byte[] logicalTopologyKey(ClusterNode node) { - byte[] nodeNameBytes = node.name().getBytes(UTF_8); - - byte[] nodeIdBytes = node.id().getBytes(UTF_8); - - return logicalTopologyKey(nodeNameBytes, nodeIdBytes); - } - - private static byte[] logicalTopologyKey(byte[] nodeNameBytes, byte[] nodeIdBytes) { - return ByteBuffer.allocate(LOGICAL_TOPOLOGY_PREFIX.length + nodeNameBytes.length + nodeIdBytes.length) - .put(LOGICAL_TOPOLOGY_PREFIX) - .put(nodeNameBytes) - .put(nodeIdBytes) - .array(); + return currentLogicalTopology.containsNodeById(node); } /** @@ -185,8 +175,6 @@ class RaftStorageManager { try (cursor) { return cursor.stream().collect(toList()); - } catch (Exception e) { - throw new IgniteInternalException("Unable to get data from storage", e); } } @@ -207,5 +195,61 @@ class RaftStorageManager { */ void restoreSnapshot(Path snapshotPath) { storage.restoreSnapshot(snapshotPath); + + currentLogicalTopology = readLogicalTopology(); + } + + private static class LogicalTopology implements Serializable { + private static final long serialVersionUID = 0L; + + private static final LogicalTopology INITIAL = new LogicalTopology(0, emptySet()); + + private final long version; + + @IgniteToStringInclude + private final Set<ClusterNode> nodes; + + private LogicalTopology(long version, Collection<ClusterNode> nodes) { + this.version = version; + this.nodes = Set.copyOf(nodes); + } + + LogicalTopology addNode(ClusterNode nodeToAdd) { + Map<String, ClusterNode> map = nodes.stream().collect(toMap(ClusterNode::name, identity())); + + ClusterNode oldNode = map.put(nodeToAdd.name(), nodeToAdd); + if (oldNode != null && oldNode.id().equals(nodeToAdd.id())) { + // We already have this node, nothing needs to be changed. + return this; + } + + return new LogicalTopology(version + 1, map.values()); + } + + LogicalTopology removeNodesByIds(Set<ClusterNode> nodesToRemove) { + Map<String, ClusterNode> mapById = nodes.stream().collect(toMap(ClusterNode::id, identity())); + + int originalSize = mapById.size(); + + for (ClusterNode nodeToRemove : nodesToRemove) { + mapById.remove(nodeToRemove.id()); + } + + if (mapById.size() == originalSize) { + // Nothing was actually removed. + return this; + } + + return new LogicalTopology(version + 1, mapById.values()); + } + + boolean containsNodeById(ClusterNode needle) { + return nodes.stream().anyMatch(node -> node.id().equals(needle.id())); + } + + @Override + public String toString() { + return S.toString(LogicalTopology.class, this); + } } } diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java index 07dcdad9c2..501b0429da 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java @@ -24,7 +24,7 @@ import org.apache.ignite.network.annotations.Transferable; import org.apache.ignite.raft.client.WriteCommand; /** - * Command that gets executed when a node needs to be removed from the logical topology. + * Command that gets executed when nodes need to be removed from the logical topology. */ @Transferable(CmgMessageGroup.Commands.NODES_LEAVE) public interface NodesLeaveCommand extends WriteCommand, NetworkMessage { diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageManagerTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageManagerTest.java index 63e695b37d..3959f07814 100644 --- a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageManagerTest.java +++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageManagerTest.java @@ -25,10 +25,14 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.nio.file.Path; +import java.util.Collection; import java.util.List; import java.util.Set; import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory; @@ -221,4 +225,39 @@ public abstract class AbstractClusterStateStorageManagerTest { assertThat(storageManager.getValidatedNodeIds(), containsInAnyOrder("node2")); } + + @Test + void logicalTopologyAdditionUsesNameAsNodeKey() { + storageManager.putLogicalTopologyNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000))); + + storageManager.putLogicalTopologyNode(new ClusterNode("id2", "node", new NetworkAddress("host", 1000))); + + Collection<ClusterNode> topology = storageManager.getLogicalTopology(); + + assertThat(topology, hasSize(1)); + + assertThat(topology.iterator().next().id(), is("id2")); + } + + @Test + void logicalTopologyRemovalUsesIdAsNodeKey() { + storageManager.putLogicalTopologyNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000))); + + storageManager.removeLogicalTopologyNodes(Set.of(new ClusterNode("id2", "node", new NetworkAddress("host", 1000)))); + + assertThat(storageManager.getLogicalTopology(), hasSize(1)); + assertThat(storageManager.getLogicalTopology().iterator().next().id(), is("id1")); + + storageManager.removeLogicalTopologyNodes(Set.of(new ClusterNode("id1", "another-name", new NetworkAddress("host", 1000)))); + + assertThat(storageManager.getLogicalTopology(), is(empty())); + } + + @Test + void inLogicalTopologyTestUsesIdAsNodeKey() { + storageManager.putLogicalTopologyNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000))); + + assertTrue(storageManager.isNodeInLogicalTopology(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)))); + assertFalse(storageManager.isNodeInLogicalTopology(new ClusterNode("another-id", "node", new NetworkAddress("host", 1000)))); + } }