This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster_scalability in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 140cbb5f32128883577a1851a4d0db2388495cb4 Author: lta <[email protected]> AuthorDate: Mon Mar 29 10:36:37 2021 +0800 This commit fix following issues: 1. fix some bug of read process 2. remove useless logic to make sure that meta log is also newest in election of data group 3. add last applied partition table version in data group member which will remove all syncLeader() when adding/removing node --- .../java/org/apache/iotdb/cluster/ClusterMain.java | 2 +- .../iotdb/cluster/coordinator/Coordinator.java | 2 +- .../iotdb/cluster/log/applier/DataLogApplier.java | 2 + .../iotdb/cluster/server/DataClusterServer.java | 3 - .../org/apache/iotdb/cluster/server/Response.java | 2 + .../server/handlers/caller/ElectionHandler.java | 4 ++ .../cluster/server/member/DataGroupMember.java | 77 ++++++++++++++++++---- .../iotdb/cluster/server/member/RaftMember.java | 9 ++- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 1 - .../org/apache/iotdb/db/service/TSServiceImpl.java | 1 + thrift/src/main/thrift/cluster.thrift | 2 +- 11 files changed, 85 insertions(+), 20 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java index 820d623..8164e8e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java @@ -69,7 +69,7 @@ public class ClusterMain { + "[-cluster_rpc_port <cluster rpc port>] " + "[-seed_nodes <node1:meta_port:data_port:cluster_rpc_port," + "node2:meta_port:data_port:cluster_rpc_port," - + "...,noden:meta_port:data_port:cluster_rpc_port>] " + + "...,node:meta_port:data_port:cluster_rpc_port>] " + "[-sc] " + "[-rpc_port <rpc port>]"); return; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java index b22d54c..9582ac1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java @@ -463,7 +463,7 @@ public class Coordinator { // and the second dimension is the number of rows per InsertTabletPlan totalRowNum = ((InsertMultiTabletPlan) parentPlan).getTabletsSize(); } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) { - totalRowNum = ((CreateMultiTimeSeriesPlan) parentPlan).getIndexes().size(); + totalRowNum = parentPlan.getPaths().size(); } if (subStatus == null) { subStatus = new TSStatus[totalRowNum]; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java index fbaffaa..18b3b18 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java @@ -66,8 +66,10 @@ public class DataLogApplier extends BaseApplier { try { if (log instanceof AddNodeLog) { metaGroupMember.getDataClusterServer().preAddNodeForDataGroup((AddNodeLog) log, dataGroupMember); + dataGroupMember.setAndSaveLastAppliedPartitionTableVersion(((AddNodeLog) log).getMetaLogIndex()); } else if (log instanceof RemoveNodeLog) { metaGroupMember.getDataClusterServer().preRemoveNodeForDataGroup((RemoveNodeLog) log, dataGroupMember); + dataGroupMember.setAndSaveLastAppliedPartitionTableVersion(((RemoveNodeLog) log).getMetaLogIndex()); } else if (log instanceof PhysicalPlanLog) { PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log; PhysicalPlan plan = physicalPlanLog.getPlan(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java index c0dbc26..20e1471 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java @@ -598,9 +598,6 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async * @param dataGroupMember */ private void removeMember(RaftNode header, DataGroupMember dataGroupMember, boolean waitFollowersToSync) { - if (dataGroupMember.syncLeader()) { - dataGroupMember.setHasSyncedLeaderBeforeRemoved(true); - } dataGroupMember.setReadOnly(); if (waitFollowersToSync && dataGroupMember.getCharacter() == NodeCharacter.LEADER) { dataGroupMember.getAppendLogThreadPool().submit(() -> dataGroupMember.waitFollowersToSync()); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java index 3573f2f..21e976d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java @@ -50,6 +50,8 @@ public class Response { public static final long RESPONSE_CHANGE_MEMBERSHIP_CONFLICT = -10; // the data migration of previous add/remove node operations is not finished. public static final long RESPONSE_DATA_MIGRATION_NOT_FINISH = -11; + // the node has removed from the group, so the operation is rejected. + public static final long RESPONSE_NODE_IS_NOT_IN_GROUP = -12; // the request is not executed locally anc should be forwarded public static final long RESPONSE_NULL = Long.MIN_VALUE; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java index 069a8bd..922cfbe 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java @@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.server.handlers.caller; import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE; import static org.apache.iotdb.cluster.server.Response.RESPONSE_LEADER_STILL_ONLINE; +import static org.apache.iotdb.cluster.server.Response.RESPONSE_NODE_IS_NOT_IN_GROUP; import java.net.ConnectException; import java.util.concurrent.atomic.AtomicBoolean; @@ -91,6 +92,9 @@ public class ElectionHandler implements AsyncMethodCallback<Long> { // the rejection from a node with a smaller term means the log of this node falls behind logger.info("{}: Election {} rejected: code {}", memberName, currTerm, voterResp); onFail(); + } else if (voterResp != RESPONSE_NODE_IS_NOT_IN_GROUP) { + logger.info("{}: This node has removed from the group", memberName); + onFail(); } else { // the election is rejected by a node with a bigger term, update current term to it logger diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index 9dd9e1a..962c466 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -20,9 +20,12 @@ package org.apache.iotdb.cluster.server.member; import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Files; @@ -151,6 +154,8 @@ public class DataGroupMember extends RaftMember { */ private boolean unchanged; + private LastAppliedPatitionTableVersion lastAppliedPartitionTableVersion; + @TestOnly public DataGroupMember() { // constructor for test @@ -181,6 +186,7 @@ public class DataGroupMember extends RaftMember { term.set(logManager.getHardState().getCurrentTerm()); voteFor = logManager.getHardState().getVoteFor(); localQueryExecutor = new LocalQueryExecutor(this); + lastAppliedPartitionTableVersion = new LastAppliedPatitionTableVersion(getMemberDir()); } /** @@ -363,18 +369,8 @@ public class DataGroupMember extends RaftMember { setReadOnly(); } - boolean canGetSnapshot; - /** - * There are two conditions that can get snapshot: - * 1. The raft member is stopped and it has synced leader successfully before stop. - * 2. The raft member is not stopped and syncing leader is successful. - */ - if (isHasSyncedLeaderBeforeRemoved()) { - canGetSnapshot = true; - } else { - canGetSnapshot = syncLeader(); - } - if (!canGetSnapshot) { + // Make sure local data is complete. + if (lastAppliedPartitionTableVersion.getVersion() != metaGroupMember.getPartitionTable().getLastMetaLogIndex()) { return null; } @@ -905,4 +901,61 @@ public class DataGroupMember extends RaftMember { public void setUnchanged(boolean unchanged) { this.unchanged = unchanged; } + + public void setAndSaveLastAppliedPartitionTableVersion(long version) { + lastAppliedPartitionTableVersion.setVersion(version); + lastAppliedPartitionTableVersion.save(); + } + + public long getLastAppliedPartitionTableVersion() { + return lastAppliedPartitionTableVersion.getVersion(); + } + + private class LastAppliedPatitionTableVersion { + + private static final String VERSION_FILE_NAME = "LAST_PARTITION_TABLE_VERSION"; + + private long version = -1; + + private String filePath; + + public LastAppliedPatitionTableVersion(String memberDir) { + this.filePath = memberDir + File.separator + VERSION_FILE_NAME; + load(); + } + + private void load() { + File versionFile = new File(filePath); + if (!versionFile.exists()) { + return ; + } + try (FileInputStream fileInputStream = new FileInputStream(filePath); + DataInputStream dataInputStream = new DataInputStream(fileInputStream)) { + version = dataInputStream.readLong(); + } catch (Exception e) { + logger.warn("Cannot deserialize last partition table version from {}", filePath, e); + } + } + + public synchronized void save() { + File versionFile = new File(filePath); + if (!versionFile.getParentFile().exists() && !versionFile.getParentFile().mkdirs()) { + logger.warn("Cannot mkdirs for {}", versionFile); + } + try (FileOutputStream outputStream = new FileOutputStream(versionFile); + DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) { + dataOutputStream.writeLong(version); + } catch (IOException e) { + logger.warn("Last partition table version in {} cannot be saved", filePath, e); + } + } + + public long getVersion() { + return version; + } + + public void setVersion(long version) { + this.version = version; + } + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 04e9e79..5f5579f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -1137,6 +1137,13 @@ public abstract class RaftMember { long thatTerm = electionRequest.getTerm(); long thatLastLogIndex = electionRequest.getLastLogIndex(); long thatLastLogTerm = electionRequest.getLastLogTerm(); + Node elector = electionRequest.getElector(); + + // check if the node is in the group + if (!allNodes.contains(elector)) { + logger.info("{}: the elector {} is not in the group, so reject this election.", name, elector); + return Response.RESPONSE_NODE_IS_NOT_IN_GROUP; + } // check the log progress of the elector long resp = checkLogProgress(thatLastLogIndex, thatLastLogTerm); @@ -1146,7 +1153,7 @@ public abstract class RaftMember { thatLastLogTerm, logManager.getLastLogTerm()); setCharacter(NodeCharacter.FOLLOWER); lastHeartbeatReceivedTime = System.currentTimeMillis(); - setVoteFor(electionRequest.getElector()); + setVoteFor(elector); updateHardState(thatTerm, getVoteFor()); } else { logger.info("{} rejected an election request, term:{}/{}, logIndex:{}/{}, logTerm:{}/{}", diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index deebba1..d8db6ad 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -1249,7 +1249,6 @@ public class PlanExecutor implements IPlanExecutor { multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i)); try { createTimeSeries(plan); - multiPlan.getResults().put(i, RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); } catch (QueryProcessException e) { multiPlan.getResults().put(i, RpcUtils .getStatus(e.getErrorCode(), e.getMessage())); diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 2262b63..32cb348 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -1542,6 +1542,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return executeNonQueryPlan(multiPlan); } catch (Exception e) { + LOGGER.error("creating multi timeseries fails", e); return onNPEOrUnexpectedException(e, "creating multi timeseries", TSStatusCode.EXECUTE_STATEMENT_ERROR); } diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift index 0a8a7c3..3fd006d 100644 --- a/thrift/src/main/thrift/cluster.thrift +++ b/thrift/src/main/thrift/cluster.thrift @@ -194,7 +194,7 @@ struct SingleSeriesQueryRequest { 10: required bool ascending 11: required int fetchSize 12: required int deduplicatedPathNum - 13: required set<int> requiredSlots + 13: optional set<int> requiredSlots } struct PreviousFillRequest {
