This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 140cbb5f32128883577a1851a4d0db2388495cb4
Author: lta <[email protected]>
AuthorDate: Mon Mar 29 10:36:37 2021 +0800

    This commit fix following issues:
    1. fix some bug of read process
    2. remove useless logic to make sure that meta log is also newest in 
election of data group
    3. add last applied partition table version in data group member which will 
remove all syncLeader() when adding/removing node
---
 .../java/org/apache/iotdb/cluster/ClusterMain.java |  2 +-
 .../iotdb/cluster/coordinator/Coordinator.java     |  2 +-
 .../iotdb/cluster/log/applier/DataLogApplier.java  |  2 +
 .../iotdb/cluster/server/DataClusterServer.java    |  3 -
 .../org/apache/iotdb/cluster/server/Response.java  |  2 +
 .../server/handlers/caller/ElectionHandler.java    |  4 ++
 .../cluster/server/member/DataGroupMember.java     | 77 ++++++++++++++++++----
 .../iotdb/cluster/server/member/RaftMember.java    |  9 ++-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  1 -
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  1 +
 thrift/src/main/thrift/cluster.thrift              |  2 +-
 11 files changed, 85 insertions(+), 20 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index 820d623..8164e8e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@ -69,7 +69,7 @@ public class ClusterMain {
           + "[-cluster_rpc_port <cluster rpc port>] "
           + "[-seed_nodes <node1:meta_port:data_port:cluster_rpc_port,"
           + "node2:meta_port:data_port:cluster_rpc_port,"
-          + "...,noden:meta_port:data_port:cluster_rpc_port>] "
+          + "...,node:meta_port:data_port:cluster_rpc_port>] "
           + "[-sc] "
           + "[-rpc_port <rpc port>]");
       return;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index b22d54c..9582ac1 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -463,7 +463,7 @@ public class Coordinator {
           // and the second dimension is the number of rows per 
InsertTabletPlan
           totalRowNum = ((InsertMultiTabletPlan) parentPlan).getTabletsSize();
         } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
-          totalRowNum = ((CreateMultiTimeSeriesPlan) 
parentPlan).getIndexes().size();
+          totalRowNum = parentPlan.getPaths().size();
         }
         if (subStatus == null) {
           subStatus = new TSStatus[totalRowNum];
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index fbaffaa..18b3b18 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -66,8 +66,10 @@ public class DataLogApplier extends BaseApplier {
     try {
       if (log instanceof AddNodeLog) {
         
metaGroupMember.getDataClusterServer().preAddNodeForDataGroup((AddNodeLog) log, 
dataGroupMember);
+        
dataGroupMember.setAndSaveLastAppliedPartitionTableVersion(((AddNodeLog) 
log).getMetaLogIndex());
       } else if (log instanceof RemoveNodeLog) {
         
metaGroupMember.getDataClusterServer().preRemoveNodeForDataGroup((RemoveNodeLog)
 log, dataGroupMember);
+        
dataGroupMember.setAndSaveLastAppliedPartitionTableVersion(((RemoveNodeLog) 
log).getMetaLogIndex());
       } else if (log instanceof PhysicalPlanLog) {
         PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
         PhysicalPlan plan = physicalPlanLog.getPlan();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index c0dbc26..20e1471 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -598,9 +598,6 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
    * @param dataGroupMember
    */
   private void removeMember(RaftNode header, DataGroupMember dataGroupMember, 
boolean waitFollowersToSync) {
-    if (dataGroupMember.syncLeader()) {
-      dataGroupMember.setHasSyncedLeaderBeforeRemoved(true);
-    }
     dataGroupMember.setReadOnly();
     if (waitFollowersToSync && dataGroupMember.getCharacter() == 
NodeCharacter.LEADER) {
       dataGroupMember.getAppendLogThreadPool().submit(() -> 
dataGroupMember.waitFollowersToSync());
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
index 3573f2f..21e976d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
@@ -50,6 +50,8 @@ public class Response {
   public static final long RESPONSE_CHANGE_MEMBERSHIP_CONFLICT = -10;
   // the data migration of previous add/remove node operations is not finished.
   public static final long RESPONSE_DATA_MIGRATION_NOT_FINISH = -11;
+  // the node has removed from the group, so the operation is rejected.
+  public static final long RESPONSE_NODE_IS_NOT_IN_GROUP = -12;
   // the request is not executed locally anc should be forwarded
   public static final long RESPONSE_NULL = Long.MIN_VALUE;
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java
index 069a8bd..922cfbe 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.server.handlers.caller;
 
 import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
 import static 
org.apache.iotdb.cluster.server.Response.RESPONSE_LEADER_STILL_ONLINE;
+import static 
org.apache.iotdb.cluster.server.Response.RESPONSE_NODE_IS_NOT_IN_GROUP;
 
 import java.net.ConnectException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -91,6 +92,9 @@ public class ElectionHandler implements 
AsyncMethodCallback<Long> {
           // the rejection from a node with a smaller term means the log of 
this node falls behind
           logger.info("{}: Election {} rejected: code {}", memberName, 
currTerm, voterResp);
           onFail();
+        } else if (voterResp != RESPONSE_NODE_IS_NOT_IN_GROUP) {
+          logger.info("{}: This node has removed from the group", memberName);
+          onFail();
         } else {
           // the election is rejected by a node with a bigger term, update 
current term to it
           logger
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 9dd9e1a..962c466 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -20,9 +20,12 @@
 package org.apache.iotdb.cluster.server.member;
 
 import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
@@ -151,6 +154,8 @@ public class DataGroupMember extends RaftMember {
    */
   private boolean unchanged;
 
+  private LastAppliedPatitionTableVersion lastAppliedPartitionTableVersion;
+
   @TestOnly
   public DataGroupMember() {
     // constructor for test
@@ -181,6 +186,7 @@ public class DataGroupMember extends RaftMember {
     term.set(logManager.getHardState().getCurrentTerm());
     voteFor = logManager.getHardState().getVoteFor();
     localQueryExecutor = new LocalQueryExecutor(this);
+    lastAppliedPartitionTableVersion = new 
LastAppliedPatitionTableVersion(getMemberDir());
   }
 
   /**
@@ -363,18 +369,8 @@ public class DataGroupMember extends RaftMember {
       setReadOnly();
     }
 
-    boolean canGetSnapshot;
-    /**
-     * There are two conditions that can get snapshot:
-     * 1. The raft member is stopped and it has synced leader successfully 
before stop.
-     * 2. The raft member is not stopped and syncing leader is successful.
-     */
-    if (isHasSyncedLeaderBeforeRemoved()) {
-      canGetSnapshot = true;
-    } else {
-      canGetSnapshot = syncLeader();
-    }
-    if (!canGetSnapshot) {
+    // Make sure local data is complete.
+    if (lastAppliedPartitionTableVersion.getVersion() != 
metaGroupMember.getPartitionTable().getLastMetaLogIndex()) {
       return null;
     }
 
@@ -905,4 +901,61 @@ public class DataGroupMember extends RaftMember {
   public void setUnchanged(boolean unchanged) {
     this.unchanged = unchanged;
   }
+
+  public void setAndSaveLastAppliedPartitionTableVersion(long version) {
+    lastAppliedPartitionTableVersion.setVersion(version);
+    lastAppliedPartitionTableVersion.save();
+  }
+
+  public long getLastAppliedPartitionTableVersion() {
+    return lastAppliedPartitionTableVersion.getVersion();
+  }
+
+  private class LastAppliedPatitionTableVersion {
+
+    private static final String VERSION_FILE_NAME = 
"LAST_PARTITION_TABLE_VERSION";
+
+    private long version = -1;
+
+    private String filePath;
+
+    public LastAppliedPatitionTableVersion(String memberDir) {
+      this.filePath = memberDir + File.separator + VERSION_FILE_NAME;
+      load();
+    }
+
+    private void load() {
+      File versionFile = new File(filePath);
+      if (!versionFile.exists()) {
+        return ;
+      }
+      try (FileInputStream fileInputStream = new FileInputStream(filePath);
+          DataInputStream dataInputStream = new 
DataInputStream(fileInputStream)) {
+        version = dataInputStream.readLong();
+      } catch (Exception e) {
+        logger.warn("Cannot deserialize last partition table version from {}", 
filePath, e);
+      }
+    }
+
+    public synchronized void save() {
+      File versionFile = new File(filePath);
+      if (!versionFile.getParentFile().exists() && 
!versionFile.getParentFile().mkdirs()) {
+        logger.warn("Cannot mkdirs for {}", versionFile);
+      }
+      try (FileOutputStream outputStream = new FileOutputStream(versionFile);
+          DataOutputStream dataOutputStream = new 
DataOutputStream(outputStream)) {
+        dataOutputStream.writeLong(version);
+      } catch (IOException e) {
+        logger.warn("Last partition table version in {} cannot be saved", 
filePath, e);
+      }
+    }
+
+    public long getVersion() {
+      return version;
+    }
+
+    public void setVersion(long version) {
+      this.version = version;
+    }
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 04e9e79..5f5579f 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1137,6 +1137,13 @@ public abstract class RaftMember {
     long thatTerm = electionRequest.getTerm();
     long thatLastLogIndex = electionRequest.getLastLogIndex();
     long thatLastLogTerm = electionRequest.getLastLogTerm();
+    Node elector = electionRequest.getElector();
+
+    // check if the node is in the group
+    if (!allNodes.contains(elector)) {
+      logger.info("{}: the elector {} is not in the group, so reject this 
election.", name, elector);
+      return Response.RESPONSE_NODE_IS_NOT_IN_GROUP;
+    }
 
     // check the log progress of the elector
     long resp = checkLogProgress(thatLastLogIndex, thatLastLogTerm);
@@ -1146,7 +1153,7 @@ public abstract class RaftMember {
           thatLastLogTerm, logManager.getLastLogTerm());
       setCharacter(NodeCharacter.FOLLOWER);
       lastHeartbeatReceivedTime = System.currentTimeMillis();
-      setVoteFor(electionRequest.getElector());
+      setVoteFor(elector);
       updateHardState(thatTerm, getVoteFor());
     } else {
       logger.info("{} rejected an election request, term:{}/{}, 
logIndex:{}/{}, logTerm:{}/{}",
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java 
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index deebba1..d8db6ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1249,7 +1249,6 @@ public class PlanExecutor implements IPlanExecutor {
           multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
       try {
         createTimeSeries(plan);
-        multiPlan.getResults().put(i, 
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
       } catch (QueryProcessException e) {
         multiPlan.getResults().put(i, RpcUtils
             .getStatus(e.getErrorCode(), e.getMessage()));
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 2262b63..32cb348 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1542,6 +1542,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
 
       return executeNonQueryPlan(multiPlan);
     } catch (Exception e) {
+      LOGGER.error("creating multi timeseries fails", e);
       return onNPEOrUnexpectedException(e, "creating multi timeseries",
           TSStatusCode.EXECUTE_STATEMENT_ERROR);
     }
diff --git a/thrift/src/main/thrift/cluster.thrift 
b/thrift/src/main/thrift/cluster.thrift
index 0a8a7c3..3fd006d 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -194,7 +194,7 @@ struct SingleSeriesQueryRequest {
   10: required bool ascending
   11: required int fetchSize
   12: required int deduplicatedPathNum
-  13: required set<int> requiredSlots
+  13: optional set<int> requiredSlots
 }
 
 struct PreviousFillRequest {

Reply via email to