This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster_scalability in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d84f0ff9d7907220bf2ac0b4cab8c184aaf2dd31 Author: lta <[email protected]> AuthorDate: Wed Feb 24 22:01:17 2021 +0800 This commit fix following issues: 1. SlotPartitionTable serialization null pointer 2. AddNodeLog and RemoveNodeLog serialization issue --- .../iotdb/cluster/log/logtypes/AddNodeLog.java | 5 +++- .../iotdb/cluster/log/logtypes/RemoveNodeLog.java | 5 +++- .../iotdb/cluster/partition/NodeRemovalResult.java | 6 ++--- .../iotdb/cluster/partition/PartitionGroup.java | 10 ++++---- .../partition/slot/SlotNodeRemovalResult.java | 9 ++++--- .../cluster/partition/slot/SlotPartitionTable.java | 28 ++++++++++++---------- .../iotdb/cluster/server/DataClusterServer.java | 2 +- .../cluster/server/member/DataGroupMember.java | 3 +++ .../cluster/server/member/MetaGroupMember.java | 3 +-- .../iotdb/cluster/utils/SerializeUtilTest.java | 18 ++++++++++++++ 10 files changed, 61 insertions(+), 28 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java index 380ba08..8a2fcab 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java @@ -68,6 +68,7 @@ public class AddNodeLog extends Log { } public ByteBuffer getPartitionTable() { + partitionTable.clear(); return partitionTable; } @@ -104,7 +105,9 @@ public class AddNodeLog extends Log { SerializeUtils.deserialize(newNode, buffer); int len = buffer.getInt(); - partitionTable = ByteBuffer.wrap(buffer.array(), buffer.position(), len); + byte[] data = new byte[len]; + System.arraycopy(buffer.array(), buffer.position(), data, 0, len); + partitionTable = ByteBuffer.wrap(data); } @Override diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java index 22af482..4b147eb 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java @@ -54,6 +54,7 @@ public class RemoveNodeLog extends Log { } public ByteBuffer getPartitionTable() { + partitionTable.clear(); return partitionTable; } @@ -90,7 +91,9 @@ public class RemoveNodeLog extends Log { SerializeUtils.deserialize(removedNode, buffer); int len = buffer.getInt(); - partitionTable = ByteBuffer.wrap(buffer.array(), buffer.position(), len); + byte[] data = new byte[len]; + System.arraycopy(buffer.array(), buffer.position(), data, 0, len); + partitionTable = ByteBuffer.wrap(data); } public Node getRemovedNode() { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java index 86ff9a2..16e25e2 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java @@ -78,18 +78,18 @@ public class NodeRemovalResult { } } - public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) { + public void deserialize(ByteBuffer buffer) { int removedGroupListSize = buffer.getInt(); for (int i = 0 ; i < removedGroupListSize; i++) { PartitionGroup group = new PartitionGroup(); - group.deserialize(buffer, idNodeMap); + group.deserialize(buffer); removedGroupList.add(group); } int newGroupListSize = buffer.getInt(); for (int i = 0 ; i < newGroupListSize; i++) { PartitionGroup group = new PartitionGroup(); - group.deserialize(buffer, idNodeMap); + group.deserialize(buffer); newGroupList.add(group); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java index e7d039c..0bb5005 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java @@ -25,9 +25,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Map; import java.util.Objects; import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.db.utils.SerializeUtils; /** * PartitionGroup contains all the nodes that will form a data group with a certain node, which are @@ -73,15 +73,17 @@ public class PartitionGroup extends ArrayList<Node> { dataOutputStream.writeInt(getId()); dataOutputStream.writeInt(size()); for (Node node : this) { - dataOutputStream.writeInt(node.getNodeIdentifier()); + SerializeUtils.serialize(node, dataOutputStream); } } - public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) { + public void deserialize(ByteBuffer buffer) { id = buffer.getInt(); int nodeNum = buffer.getInt(); for (int i2 = 0; i2 < nodeNum; i2++) { - add(idNodeMap.get(buffer.getInt())); + Node node = new Node(); + SerializeUtils.deserialize(node, buffer); + add(node); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java index a04a289..9a17ea3 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java @@ -29,6 +29,7 @@ import java.util.Map; import org.apache.iotdb.cluster.partition.NodeRemovalResult; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftNode; +import org.apache.iotdb.db.utils.SerializeUtils; /** * SlotNodeRemovalResult stores the removed partition group and who will take over its slots. @@ -61,11 +62,13 @@ public class SlotNodeRemovalResult extends NodeRemovalResult { } @Override - public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) { - super.deserialize(buffer, idNodeMap); + public void deserialize(ByteBuffer buffer) { + super.deserialize(buffer); int size = buffer.getInt(); for (int i = 0 ; i < size; i++) { - RaftNode raftNode = new RaftNode(idNodeMap.get(buffer.getInt()), buffer.getInt()); + Node node = new Node(); + SerializeUtils.deserialize(node, buffer); + RaftNode raftNode = new RaftNode(node, buffer.getInt()); List<Integer> slots = new ArrayList<>(); int slotSize = buffer.getInt(); for (int j = 0 ; j < slotSize; j++) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java index 255fb22..8817b9f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java @@ -200,7 +200,7 @@ public class SlotPartitionTable implements PartitionTable { // assuming the nodes are [1,2,3,4,5] int nodeIndex = nodeRing.indexOf(raftNode.getNode()); if (nodeIndex == -1) { - logger.error("Node {} is not in the cluster", raftNode.getNode()); + logger.warn("Node {} is not in the cluster", raftNode.getNode()); return null; } int endIndex = nodeIndex + replicationNum; @@ -307,7 +307,7 @@ public class SlotPartitionTable implements PartitionTable { } } - calculateGlobalGroups(nodeRing); + globalGroups = calculateGlobalGroups(nodeRing); // the slots movement is only done logically, the new node itself will pull data from the // old node @@ -354,7 +354,7 @@ public class SlotPartitionTable implements PartitionTable { dataOutputStream.writeInt(previousNodeMap.size()); for (Entry<RaftNode, Map<Integer, PartitionGroup>> nodeMapEntry : previousNodeMap.entrySet()) { - dataOutputStream.writeInt(nodeMapEntry.getKey().getNode().getNodeIdentifier()); + SerializeUtils.serialize(nodeMapEntry.getKey().getNode(), dataOutputStream); dataOutputStream.writeInt(nodeMapEntry.getKey().getRaftId()); Map<Integer, PartitionGroup> prevHolders = nodeMapEntry.getValue(); dataOutputStream.writeInt(prevHolders.size()); @@ -383,15 +383,15 @@ public class SlotPartitionTable implements PartitionTable { logger.info("Initializing the partition table from buffer"); totalSlotNumbers = buffer.getInt(); int size = buffer.getInt(); - Map<Integer, Node> idNodeMap = new HashMap<>(); + nodeSlotMap = new HashMap<>(); + Node node; for (int i = 0; i < size; i++) { - Node node = new Node(); + node = new Node(); SerializeUtils.deserialize(node, buffer); RaftNode raftNode = new RaftNode(node, buffer.getInt()); List<Integer> slots = new ArrayList<>(); SerializeUtils.deserializeIntList(slots, buffer); nodeSlotMap.put(raftNode, slots); - idNodeMap.put(node.getNodeIdentifier(), node); for (Integer slot : slots) { slotNodes[slot] = raftNode; } @@ -400,22 +400,24 @@ public class SlotPartitionTable implements PartitionTable { int prevNodeMapSize = buffer.getInt(); previousNodeMap = new HashMap<>(); for (int i = 0; i < prevNodeMapSize; i++) { - int nodeId = buffer.getInt(); - RaftNode node = new RaftNode(idNodeMap.get(nodeId), buffer.getInt()); + node = new Node(); + SerializeUtils.deserialize(node, buffer); + RaftNode raftNode = new RaftNode(node, buffer.getInt()); Map<Integer, PartitionGroup> prevHolders = new HashMap<>(); int holderNum = buffer.getInt(); for (int i1 = 0; i1 < holderNum; i1++) { PartitionGroup group = new PartitionGroup(); - group.deserialize(buffer, idNodeMap); + group.deserialize(buffer); prevHolders.put(buffer.getInt(), group); } - previousNodeMap.put(node, prevHolders); + previousNodeMap.put(raftNode, prevHolders); } nodeRemovalResult = new NodeRemovalResult(); - nodeRemovalResult.deserialize(buffer, idNodeMap); + nodeRemovalResult.deserialize(buffer); + nodeRing.clear(); for (RaftNode raftNode : nodeSlotMap.keySet()) { if (!nodeRing.contains(raftNode.getNode())) { nodeRing.add(raftNode.getNode()); @@ -527,7 +529,7 @@ public class SlotPartitionTable implements PartitionTable { result.addNewGroup(newGrp); } - calculateGlobalGroups(nodeRing); + globalGroups = calculateGlobalGroups(nodeRing); // the slots movement is only done logically, the new node itself will pull data from the // old node @@ -563,7 +565,7 @@ public class SlotPartitionTable implements PartitionTable { List<PartitionGroup> result = new ArrayList<>(); for (Node node : nodeRing) { for (int i = 0; i < multiRaftFactor; i++) { - result.add(getHeaderGroup(new RaftNode(node, i))); + result.add(getHeaderGroup(new RaftNode(node, i), nodeRing)); } } return result; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java index 454f5df..3b70208 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java @@ -628,7 +628,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async DataGroupMember dataGroupMember = entry.getValue(); if (dataGroupMember.getHeader().equals(node) || node.equals(thisNode)) { entryIterator.remove(); - removeMember(entry.getKey(), entry.getValue()); + removeMember(entry.getKey(), dataGroupMember); } else { // the group should be updated and pull new slots from the removed node dataGroupMember.removeNode(node, removalResult); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index 7b4f663..e0a382e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -752,6 +752,9 @@ public class DataGroupMember extends RaftMember { if (allNodes.contains(removedNode)) { // update the group if the deleted node was in it PartitionGroup newGroup = metaGroupMember.getPartitionTable().getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId())); + if (newGroup == null) { + return; + } Node newNodeToGroup = newGroup.get(newGroup.size() - 1); allNodes.add(newNodeToGroup); peerMap.putIfAbsent(newNodeToGroup, new Peer(logManager.getLastLogIndex())); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index dd5b419..ec4f0de 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -1934,7 +1934,7 @@ public class MetaGroupMember extends RaftMember { // node removal must be serialized to reduce potential concurrency problem synchronized (logManager) { // update partition table - partitionTable.removeNode(node); + partitionTable.removeNode(target); ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1); RemoveNodeLog removeNodeLog = new RemoveNodeLog(); @@ -1986,7 +1986,6 @@ public class MetaGroupMember extends RaftMember { if (allNodes.contains(oldNode)) { allNodes.remove(oldNode); idNodeMap.remove(oldNode.nodeIdentifier); - } // save the updated partition table diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java index 134b2fe..bc855af 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java @@ -36,6 +36,7 @@ import org.apache.iotdb.cluster.exception.UnknownLogTypeException; import org.apache.iotdb.cluster.log.Log; import org.apache.iotdb.cluster.log.LogParser; import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog; +import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.metadata.PartialPath; @@ -51,6 +52,23 @@ import org.junit.Test; public class SerializeUtilTest { @Test + public void testSlotPartitionTable() { + List<Node> nodes = new ArrayList<>(); + nodes.add(TestUtils.getNode(0)); + nodes.add(TestUtils.getNode(1)); + nodes.add(TestUtils.getNode(2)); + SlotPartitionTable slotPartitionTable1 = new SlotPartitionTable(nodes, TestUtils.getNode(0)); + SlotPartitionTable slotPartitionTable2 = new SlotPartitionTable(nodes, TestUtils.getNode(0)); + SlotPartitionTable slotPartitionTable3 = new SlotPartitionTable(nodes, TestUtils.getNode(0)); + slotPartitionTable1.removeNode(TestUtils.getNode(0)); + slotPartitionTable2.deserialize(slotPartitionTable1.serialize()); + assertEquals(slotPartitionTable2, slotPartitionTable1); + slotPartitionTable1.addNode(TestUtils.getNode(0)); + slotPartitionTable3.deserialize(slotPartitionTable1.serialize()); + assertEquals(slotPartitionTable3, slotPartitionTable1); + } + + @Test public void testStrToNode() { for (int i = 0; i < 10; i++) { Node node = TestUtils.getNode(i);
