This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2405febe72 IGNITE-18240 Store logical topology as a single KV entry 
instead of a number of entries (#1371)
2405febe72 is described below

commit 2405febe72fd295a1044bcdb6787b7f6a8389d7a
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Thu Nov 24 17:36:06 2022 +0400

    IGNITE-18240 Store logical topology as a single KV entry instead of a 
number of entries (#1371)
---
 .../management/raft/RaftStorageManager.java        | 128 ++++++++++++++-------
 .../raft/commands/NodesLeaveCommand.java           |   2 +-
 .../AbstractClusterStateStorageManagerTest.java    |  39 +++++++
 3 files changed, 126 insertions(+), 43 deletions(-)

diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
index f3c94cbfea..77c79da236 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
@@ -18,20 +18,25 @@
 package org.apache.ignite.internal.cluster.management.raft;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.emptySet;
+import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.cluster.management.ClusterState;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
@@ -42,16 +47,20 @@ class RaftStorageManager {
     /** Storage key for the CMG state. */
     private static final byte[] CMG_STATE_KEY = "cmg_state".getBytes(UTF_8);
 
-    /** Prefix for the keys for logical topology nodes. */
-    private static final byte[] LOGICAL_TOPOLOGY_PREFIX = 
"logical_".getBytes(UTF_8);
+    /** Storage key for the logical topology. */
+    private static final byte[] LOGICAL_TOPOLOGY_KEY = 
"logical".getBytes(UTF_8);
 
     /** Prefix for validation tokens. */
     private static final byte[] VALIDATED_NODE_PREFIX = 
"validation_".getBytes(UTF_8);
 
     private final ClusterStateStorage storage;
 
+    private volatile LogicalTopology currentLogicalTopology;
+
     RaftStorageManager(ClusterStateStorage storage) {
         this.storage = storage;
+
+        currentLogicalTopology = readLogicalTopology();
     }
 
     /**
@@ -79,11 +88,13 @@ class RaftStorageManager {
      * Retrieves the current logical topology.
      */
     Collection<ClusterNode> getLogicalTopology() {
-        try (Cursor<ClusterNode> cursor = 
storage.getWithPrefix(LOGICAL_TOPOLOGY_PREFIX, (k, v) -> fromBytes(v))) {
-            return cursor.stream().collect(toList());
-        } catch (Exception e) {
-            throw new IgniteInternalException("Unable to get data from 
storage", e);
-        }
+        return currentLogicalTopology.nodes;
+    }
+
+    private LogicalTopology readLogicalTopology() {
+        byte[] bytes = storage.get(LOGICAL_TOPOLOGY_KEY);
+
+        return bytes == null ? LogicalTopology.INITIAL : fromBytes(bytes);
     }
 
     /**
@@ -92,16 +103,17 @@ class RaftStorageManager {
      * @param node Node to save.
      */
     void putLogicalTopologyNode(ClusterNode node) {
-        byte[] nodeNameBytes = node.name().getBytes(UTF_8);
-
-        byte[] nodeIdBytes = node.id().getBytes(UTF_8);
+        replaceLogicalTopologyWith(currentLogicalTopology.addNode(node));
+    }
 
-        byte[] key = logicalTopologyKey(nodeNameBytes, nodeIdBytes);
+    private void replaceLogicalTopologyWith(LogicalTopology newTopology) {
+        if (newTopology == currentLogicalTopology) {
+            return;
+        }
 
-        // Replace all nodes with the same consistent ID.
-        byte[] prefix = Arrays.copyOf(key, key.length - nodeIdBytes.length);
+        storage.put(LOGICAL_TOPOLOGY_KEY, toBytes(newTopology));
 
-        storage.replaceAll(prefix, key, toBytes(node));
+        currentLogicalTopology = newTopology;
     }
 
     /**
@@ -110,36 +122,14 @@ class RaftStorageManager {
      * @param nodes Nodes to remove.
      */
     void removeLogicalTopologyNodes(Set<ClusterNode> nodes) {
-        Collection<byte[]> keys = nodes.stream()
-                .map(RaftStorageManager::logicalTopologyKey)
-                .collect(toList());
-
-        storage.removeAll(keys);
+        
replaceLogicalTopologyWith(currentLogicalTopology.removeNodesByIds(nodes));
     }
 
     /**
      * Returns {@code true} if a given node is present in the logical topology 
or {@code false} otherwise.
      */
     boolean isNodeInLogicalTopology(ClusterNode node) {
-        byte[] value = storage.get(logicalTopologyKey(node));
-
-        return value != null;
-    }
-
-    private static byte[] logicalTopologyKey(ClusterNode node) {
-        byte[] nodeNameBytes = node.name().getBytes(UTF_8);
-
-        byte[] nodeIdBytes = node.id().getBytes(UTF_8);
-
-        return logicalTopologyKey(nodeNameBytes, nodeIdBytes);
-    }
-
-    private static byte[] logicalTopologyKey(byte[] nodeNameBytes, byte[] 
nodeIdBytes) {
-        return ByteBuffer.allocate(LOGICAL_TOPOLOGY_PREFIX.length + 
nodeNameBytes.length + nodeIdBytes.length)
-                .put(LOGICAL_TOPOLOGY_PREFIX)
-                .put(nodeNameBytes)
-                .put(nodeIdBytes)
-                .array();
+        return currentLogicalTopology.containsNodeById(node);
     }
 
     /**
@@ -185,8 +175,6 @@ class RaftStorageManager {
 
         try (cursor) {
             return cursor.stream().collect(toList());
-        } catch (Exception e) {
-            throw new IgniteInternalException("Unable to get data from 
storage", e);
         }
     }
 
@@ -207,5 +195,61 @@ class RaftStorageManager {
      */
     void restoreSnapshot(Path snapshotPath) {
         storage.restoreSnapshot(snapshotPath);
+
+        currentLogicalTopology = readLogicalTopology();
+    }
+
+    private static class LogicalTopology implements Serializable {
+        private static final long serialVersionUID = 0L;
+
+        private static final LogicalTopology INITIAL = new LogicalTopology(0, 
emptySet());
+
+        private final long version;
+
+        @IgniteToStringInclude
+        private final Set<ClusterNode> nodes;
+
+        private LogicalTopology(long version, Collection<ClusterNode> nodes) {
+            this.version = version;
+            this.nodes = Set.copyOf(nodes);
+        }
+
+        LogicalTopology addNode(ClusterNode nodeToAdd) {
+            Map<String, ClusterNode> map = 
nodes.stream().collect(toMap(ClusterNode::name, identity()));
+
+            ClusterNode oldNode = map.put(nodeToAdd.name(), nodeToAdd);
+            if (oldNode != null && oldNode.id().equals(nodeToAdd.id())) {
+                // We already have this node, nothing needs to be changed.
+                return this;
+            }
+
+            return new LogicalTopology(version + 1, map.values());
+        }
+
+        LogicalTopology removeNodesByIds(Set<ClusterNode> nodesToRemove) {
+            Map<String, ClusterNode> mapById = 
nodes.stream().collect(toMap(ClusterNode::id, identity()));
+
+            int originalSize = mapById.size();
+
+            for (ClusterNode nodeToRemove : nodesToRemove) {
+                mapById.remove(nodeToRemove.id());
+            }
+
+            if (mapById.size() == originalSize) {
+                // Nothing was actually removed.
+                return this;
+            }
+
+            return new LogicalTopology(version + 1, mapById.values());
+        }
+
+        boolean containsNodeById(ClusterNode needle) {
+            return nodes.stream().anyMatch(node -> 
node.id().equals(needle.id()));
+        }
+
+        @Override
+        public String toString() {
+            return S.toString(LogicalTopology.class, this);
+        }
     }
 }
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java
index 07dcdad9c2..501b0429da 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java
@@ -24,7 +24,7 @@ import org.apache.ignite.network.annotations.Transferable;
 import org.apache.ignite.raft.client.WriteCommand;
 
 /**
- * Command that gets executed when a node needs to be removed from the logical 
topology.
+ * Command that gets executed when nodes need to be removed from the logical 
topology.
  */
 @Transferable(CmgMessageGroup.Commands.NODES_LEAVE)
 public interface NodesLeaveCommand extends WriteCommand, NetworkMessage {
diff --git 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageManagerTest.java
 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageManagerTest.java
index 63e695b37d..3959f07814 100644
--- 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageManagerTest.java
+++ 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageManagerTest.java
@@ -25,10 +25,14 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.file.Path;
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
@@ -221,4 +225,39 @@ public abstract class 
AbstractClusterStateStorageManagerTest {
 
         assertThat(storageManager.getValidatedNodeIds(), 
containsInAnyOrder("node2"));
     }
+
+    @Test
+    void logicalTopologyAdditionUsesNameAsNodeKey() {
+        storageManager.putLogicalTopologyNode(new ClusterNode("id1", "node", 
new NetworkAddress("host", 1000)));
+
+        storageManager.putLogicalTopologyNode(new ClusterNode("id2", "node", 
new NetworkAddress("host", 1000)));
+
+        Collection<ClusterNode> topology = storageManager.getLogicalTopology();
+
+        assertThat(topology, hasSize(1));
+
+        assertThat(topology.iterator().next().id(), is("id2"));
+    }
+
+    @Test
+    void logicalTopologyRemovalUsesIdAsNodeKey() {
+        storageManager.putLogicalTopologyNode(new ClusterNode("id1", "node", 
new NetworkAddress("host", 1000)));
+
+        storageManager.removeLogicalTopologyNodes(Set.of(new 
ClusterNode("id2", "node", new NetworkAddress("host", 1000))));
+
+        assertThat(storageManager.getLogicalTopology(), hasSize(1));
+        assertThat(storageManager.getLogicalTopology().iterator().next().id(), 
is("id1"));
+
+        storageManager.removeLogicalTopologyNodes(Set.of(new 
ClusterNode("id1", "another-name", new NetworkAddress("host", 1000))));
+
+        assertThat(storageManager.getLogicalTopology(), is(empty()));
+    }
+
+    @Test
+    void inLogicalTopologyTestUsesIdAsNodeKey() {
+        storageManager.putLogicalTopologyNode(new ClusterNode("id1", "node", 
new NetworkAddress("host", 1000)));
+
+        assertTrue(storageManager.isNodeInLogicalTopology(new 
ClusterNode("id1", "node", new NetworkAddress("host", 1000))));
+        assertFalse(storageManager.isNodeInLogicalTopology(new 
ClusterNode("another-id", "node", new NetworkAddress("host", 1000))));
+    }
 }

Reply via email to