This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 1647d39  fix some serve bugs and modify for query time series
1647d39 is described below

commit 1647d3993b2fd63d0ec498780e1976e6be37c507
Author: lta <[email protected]>
AuthorDate: Sat Mar 30 22:43:33 2019 +0800

    fix some serve bugs and modify for query time series
---
 .../callback/{MultiTask.java => MultiQPTask.java}  |  4 +-
 .../cluster/callback/{Task.java => QPTask.java}    |  8 +--
 .../{SingleTask.java => SingleQPTask.java}         |  4 +-
 .../apache/iotdb/cluster/config/ClusterConfig.java |  6 +-
 .../org/apache/iotdb/cluster/entity/Server.java    |  6 +-
 .../cluster/entity/raft/DataStateMachine.java      |  7 +-
 .../cluster/entity/raft/MetadataStateManchine.java |  9 +--
 .../apache/iotdb/cluster/qp/ClusterQPExecutor.java | 48 +++++++-------
 .../cluster/qp/executor/NonQueryExecutor.java      | 75 ++++++++++++++++------
 .../cluster/qp/executor/QueryMetadataExecutor.java | 32 +++++----
 .../org/apache/iotdb/cluster/rpc/NodeAsClient.java | 10 +--
 .../iotdb/cluster/rpc/impl/RaftNodeAsClient.java   | 16 ++---
 ...cessor.java => DataNonQueryAsyncProcessor.java} | 74 ++++++++-------------
 ...or.java => MetadataNonQueryAsyncProcessor.java} | 70 +++++++-------------
 .../processor/QueryTimeSeriesAsyncProcessor.java   | 58 ++++++++---------
 ...nQueryRequest.java => DataNonQueryRequest.java} | 15 ++---
 ...ryRequest.java => MetadataNonQueryRequest.java} | 15 ++---
 ...ueryResponse.java => DataNonQueryResponse.java} |  9 ++-
 ...Response.java => MetadataNonQueryResponse.java} |  9 ++-
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  |  2 +-
 .../apache/iotdb/cluster/utils/hash/Router.java    |  4 +-
 service-rpc/src/main/thrift/rpc.thrift             |  2 +-
 22 files changed, 238 insertions(+), 245 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiTask.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiQPTask.java
similarity index 91%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiTask.java
rename to 
cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiQPTask.java
index c7515c8..ffbd0d3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiQPTask.java
@@ -25,9 +25,9 @@ import org.apache.iotdb.cluster.rpc.response.BasicResponse;
 /**
  * Split a task to multi task closures.
  */
-public class MultiTask extends Task {
+public class MultiQPTask extends QPTask {
 
-  public MultiTask(boolean isSyncTask, int taskNum, BasicRequest request) {
+  public MultiQPTask(boolean isSyncTask, int taskNum, BasicRequest request) {
     super(isSyncTask, taskNum, TaskState.INITIAL);
     this.request = request;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
similarity index 94%
rename from cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
index 7940a5f..7fbfe5a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
@@ -22,15 +22,15 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.iotdb.cluster.rpc.request.BasicRequest;
 import org.apache.iotdb.cluster.rpc.response.BasicResponse;
 
-public abstract class Task {
+public abstract class QPTask {
 
   /**
-   * Task response
+   * QPTask response
    */
   protected BasicResponse response;
 
   /**
-   * Task request
+   * QPTask request
    */
   protected BasicRequest request;
 
@@ -54,7 +54,7 @@ public abstract class Task {
    */
   protected TaskState taskState;
 
-  public Task(boolean isSyncTask, int taskNum, TaskState taskState) {
+  public QPTask(boolean isSyncTask, int taskNum, TaskState taskState) {
     this.isSyncTask = isSyncTask;
     this.taskNum = taskNum;
     this.taskCountDownLatch = new CountDownLatch(taskNum);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
similarity index 93%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
rename to 
cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
index ac5492c..85da087 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
@@ -24,11 +24,11 @@ import org.apache.iotdb.cluster.rpc.response.BasicResponse;
 /**
  * Process single task.
  */
-public class SingleTask extends Task {
+public class SingleQPTask extends QPTask {
 
   private static final int TASK_NUM = 1;
 
-  public SingleTask(boolean isSyncTask, BasicRequest request) {
+  public SingleQPTask(boolean isSyncTask, BasicRequest request) {
     super(isSyncTask, TASK_NUM, TaskState.INITIAL);
     this.request = request;
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 83b129f..ca9b554 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -31,7 +31,7 @@ public class ClusterConfig {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterConfig.class);
   public static final String CONFIG_NAME = "iotdb-cluster.properties";
-  public static final String DEFAULT_NODE = "127.0.0.1:8888";
+  public static final String DEFAULT_NODE = 
"192.168.130.19:8888,192.168.130.12:8888,192.168.130.14:8888,192.168.130.16:8888,192.168.130.18:8888";
   public static final String METADATA_GROUP_ID = "metadata";
   private static final String DEFAULT_RAFT_DIR = "raft";
   private static final String DEFAULT_RAFT_METADATA_DIR = "metadata";
@@ -41,14 +41,14 @@ public class ClusterConfig {
   /**
    * Cluster node: {ip1,ip2,...,ipn}
    */
-  private String[] nodes = {DEFAULT_NODE};
+  private String[] nodes = 
{"192.168.130.19:8888","192.168.130.12:8888","192.168.130.14:8888","192.168.130.16:8888","192.168.130.18:8888"};
 
   /**
    * Replication number
    */
   private int replication = 3;
 
-  private String ip = "127.0.0.1";
+  private String ip = "192.168.130.19";
 
   private int port = 8888;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index 108dfa7..0c7d568 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -29,7 +29,8 @@ import 
org.apache.iotdb.cluster.entity.data.DataPartitionHolder;
 import org.apache.iotdb.cluster.entity.metadata.MetadataHolder;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
-import org.apache.iotdb.cluster.rpc.processor.NonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.processor.DataNonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.processor.MetadataNonQueryAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.processor.QueryTimeSeriesAsyncProcessor;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
@@ -62,7 +63,8 @@ public class Server {
     RpcServer rpcServer = new RpcServer(serverId.getPort());
     RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
 
-    rpcServer.registerUserProcessor(new NonQueryAsyncProcessor(this));
+    rpcServer.registerUserProcessor(new DataNonQueryAsyncProcessor(this));
+    rpcServer.registerUserProcessor(new MetadataNonQueryAsyncProcessor(this));
     rpcServer.registerUserProcessor(new QueryTimeSeriesAsyncProcessor(this));
 
     metadataHolder = new MetadataRaftHolder(peerIds, serverId, rpcServer, 
true);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
index 79a9d7a..36535d6 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
@@ -30,7 +30,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.rpc.request.NonQueryRequest;
+import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
@@ -61,14 +61,14 @@ public class DataStateMachine extends StateMachineAdapter {
     while (iterator.hasNext()) {
 
       Closure closure = null;
-      NonQueryRequest request = null;
+      MetadataNonQueryRequest request = null;
       if (iterator.done() != null) {
         closure = iterator.done();
       }
       final ByteBuffer data = iterator.getData();
       try {
         request = SerializerManager.getSerializer(SerializerManager.Hessian2)
-            .deserialize(data.array(), NonQueryRequest.class.getName());
+            .deserialize(data.array(), 
MetadataNonQueryRequest.class.getName());
       } catch (final CodecException e) {
         LOGGER.error("Fail to decode IncrementAndGetRequest", e);
       }
@@ -99,6 +99,7 @@ public class DataStateMachine extends StateMachineAdapter {
   @Override
   public void onStartFollowing(LeaderChangeContext ctx) {
     RaftUtils.updateRaftGroupLeader(groupId, ctx.getLeaderId());
+    this.leaderTerm.set(-1);
     LOGGER.info("Start following, {} starts to be leader of {}", 
ctx.getLeaderId(), groupId);
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
index 5c43fab..4ab4cb1 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
@@ -30,7 +30,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.rpc.request.NonQueryRequest;
+import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -70,7 +70,7 @@ public class MetadataStateManchine extends 
StateMachineAdapter {
   }
 
   /**
-   * Update StrageGroup List and userProfileMap based on Task read from raft 
log
+   * Update StrageGroup List and userProfileMap based on QPTask read from raft 
log
    *
    * @param iterator task iterator
    */
@@ -79,14 +79,14 @@ public class MetadataStateManchine extends 
StateMachineAdapter {
     while (iterator.hasNext()) {
 
       Closure closure = null;
-      NonQueryRequest request = null;
+      MetadataNonQueryRequest request = null;
       if (iterator.done() != null) {
         closure = iterator.done();
       }
       final ByteBuffer data = iterator.getData();
       try {
         request = SerializerManager.getSerializer(SerializerManager.Hessian2)
-            .deserialize(data.array(), NonQueryRequest.class.getName());
+            .deserialize(data.array(), 
MetadataNonQueryRequest.class.getName());
       } catch (final CodecException e) {
         LOGGER.error("Fail to decode IncrementAndGetRequest", e);
       }
@@ -131,6 +131,7 @@ public class MetadataStateManchine extends 
StateMachineAdapter {
   @Override
   public void onStartFollowing(LeaderChangeContext ctx) {
     RaftUtils.updateRaftGroupLeader(groupId, ctx.getLeaderId());
+    this.leaderTerm.set(-1);
     LOGGER.info("Start following, {} starts to be leader of {}", 
ctx.getLeaderId(), groupId);
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index ab9f153..c296296 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -20,10 +20,12 @@ package org.apache.iotdb.cluster.qp;
 
 import com.alipay.sofa.jraft.entity.PeerId;
 import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
-import org.apache.iotdb.cluster.callback.Task;
-import org.apache.iotdb.cluster.callback.Task.TaskState;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.cluster.callback.QPTask;
+import org.apache.iotdb.cluster.callback.QPTask.TaskState;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.rpc.NodeAsClient;
 import org.apache.iotdb.cluster.rpc.impl.RaftNodeAsClient;
@@ -33,7 +35,6 @@ import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
 import org.apache.iotdb.cluster.utils.hash.Router;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,8 +45,8 @@ public abstract class ClusterQPExecutor {
   protected Router router = Router.getInstance();
   protected PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(),
       CLUSTER_CONFIG.getPort());
-  protected OverflowQPExecutor qpExecutor = new OverflowQPExecutor();
   protected MManager mManager = MManager.getInstance();
+  protected final Server server = Server.getInstance();
 
   /**
    * Rpc Service Client
@@ -62,6 +63,8 @@ public abstract class ClusterQPExecutor {
    */
   protected static int SUB_TASK_NUM = 1;
 
+  protected final AtomicInteger requestId = new AtomicInteger(0);
+
   /**
    * Get Storage Group Name by device name
    */
@@ -83,8 +86,8 @@ public abstract class ClusterQPExecutor {
   }
 
   /**
-   * Verify if the non query command can execute in local. 1. If this node 
belongs to the storage group 2. If
-   * this node is leader.
+   * Verify if the non query command can execute in local. 1. If this node 
belongs to the storage
+   * group 2. If this node is leader.
    */
   public boolean canHandleNonQuery(String storageGroup) {
     if (router.containPhysicalNode(storageGroup, localNode)) {
@@ -97,41 +100,42 @@ public abstract class ClusterQPExecutor {
   }
 
   /**
-   * Verify if the query command can execute in local. Check if this node 
belongs to the storage group
+   * Verify if the query command can execute in local. Check if this node 
belongs to the storage
+   * group
    */
   public boolean canHandleQuery(String storageGroup) {
     return router.containPhysicalNode(storageGroup, localNode);
   }
 
   /**
-   * Async handle task by task and leader id
+   * Async handle QPTask by QPTask and leader id
    *
-   * @param task request task
+   * @param QPTask request QPTask
    * @param leader leader of the target raft group
-   * @param taskRetryNum Number of task retries due to timeout and redirected.
+   * @param taskRetryNum Number of QPTask retries due to timeout and 
redirected.
    * @return basic response
    */
-  public BasicResponse asyncHandleTaskGetRes(Task task, PeerId leader, int 
taskRetryNum)
+  public BasicResponse asyncHandleTaskGetRes(QPTask QPTask, PeerId leader, int 
taskRetryNum)
       throws InterruptedException, RaftConnectionException {
     if (taskRetryNum >= TASK_MAX_RETRY) {
-      throw new RaftConnectionException(String.format("Task retries reach the 
upper bound %s",
+      throw new RaftConnectionException(String.format("QPTask retries reach 
the upper bound %s",
           TASK_MAX_RETRY));
     }
     NodeAsClient client = new RaftNodeAsClient();
     /** Call async method **/
-    client.asyncHandleRequest(cliClientService, task.getRequest(), leader, 
task);
-    task.await();
-    if (task.getTaskState() != TaskState.FINISH) {
-      if (task.getTaskState() == TaskState.REDIRECT) {
+    client.asyncHandleRequest(cliClientService, QPTask.getRequest(), leader, 
QPTask);
+    QPTask.await();
+    if (QPTask.getTaskState() != TaskState.FINISH) {
+      if (QPTask.getTaskState() == TaskState.REDIRECT) {
         /** redirect to the right leader **/
-        leader = PeerId.parsePeer(task.getResponse().getLeaderStr());
-        LOGGER.info("Redirect leader: {}, group id = {}" , leader, 
task.getRequest().getGroupID());
-        RaftUtils.updateRaftGroupLeader(task.getRequest().getGroupID(), 
leader);
+        leader = PeerId.parsePeer(QPTask.getResponse().getLeaderStr());
+        LOGGER.info("Redirect leader: {}, group id = {}", leader, 
QPTask.getRequest().getGroupID());
+        RaftUtils.updateRaftGroupLeader(QPTask.getRequest().getGroupID(), 
leader);
       }
-      task.resetTask();
-      return asyncHandleTaskGetRes(task, leader, taskRetryNum + 1);
+      QPTask.resetTask();
+      return asyncHandleTaskGetRes(QPTask, leader, taskRetryNum + 1);
     }
-    return task.getResponse();
+    return QPTask.getResponse();
   }
 
   public void shutdown() {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index a95a227..c333f38 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -18,16 +18,25 @@
  */
 package org.apache.iotdb.cluster.qp.executor;
 
+import com.alipay.remoting.exception.CodecException;
+import com.alipay.remoting.serialization.SerializerManager;
 import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.entity.Task;
 import com.alipay.sofa.jraft.option.CliOptions;
 import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
 import java.io.IOException;
-import org.apache.iotdb.cluster.callback.SingleTask;
-import org.apache.iotdb.cluster.callback.Task;
+import java.nio.ByteBuffer;
+import org.apache.iotdb.cluster.callback.QPTask;
+import org.apache.iotdb.cluster.callback.SingleQPTask;
+import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
+import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
-import org.apache.iotdb.cluster.rpc.request.NonQueryRequest;
+import org.apache.iotdb.cluster.rpc.request.BasicRequest;
+import org.apache.iotdb.cluster.rpc.request.DataNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
 import org.apache.iotdb.cluster.rpc.response.BasicResponse;
+import org.apache.iotdb.cluster.rpc.response.DataNonQueryResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
@@ -113,7 +122,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
     Path path = updatePlan.getPath();
     String deviceId = path.getDevice();
     String storageGroup = getStroageGroupByDevice(deviceId);
-    return handleRequest(storageGroup, updatePlan);
+    return handleDataGroupRequest(storageGroup, updatePlan);
   }
 
   //TODO
@@ -128,7 +137,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
       throws ProcessorException, PathErrorException, InterruptedException, 
IOException, RaftConnectionException {
     String deviceId = insertPlan.getDeviceId();
     String storageGroup = getStroageGroupByDevice(deviceId);
-    return handleRequest(storageGroup, insertPlan);
+    return handleDataGroupRequest(storageGroup, insertPlan);
   }
 
   /**
@@ -156,7 +165,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
       case DELETE_PATH:
         String deviceId = path.getDevice();
         String storageGroup = getStroageGroupByDevice(deviceId);
-        return handleRequest(storageGroup, metadataPlan);
+        return handleDataGroupRequest(storageGroup, metadataPlan);
       case SET_FILE_LEVEL:
         boolean fileLevelExist = 
mManager.checkStorageLevelOfMTree(path.getFullPath());
         if (fileLevelExist) {
@@ -177,32 +186,55 @@ public class NonQueryExecutor extends ClusterQPExecutor {
   }
 
   /**
-   * Handle request by storage group and physical plan
+   * Handle request to data group by storage group and physical plan
    */
-  private boolean handleRequest(String storageGroup, PhysicalPlan plan)
-      throws ProcessorException, IOException, RaftConnectionException, 
InterruptedException {
+  private boolean handleDataGroupRequest(String storageGroup, PhysicalPlan 
plan)
+      throws IOException, RaftConnectionException, InterruptedException {
+    String groupId = getGroupIdBySG(storageGroup);
+    PeerId leader = RaftUtils.getTargetPeerID(groupId);
+    DataNonQueryRequest request = new DataNonQueryRequest(groupId, plan);
+    SingleQPTask qpTask = new SingleQPTask(false, request);
     /** Check if the plan can be executed locally. **/
     if (canHandleNonQuery(storageGroup)) {
-      return qpExecutor.processNonQuery(plan);
+      return handleDataGroupRequestLocally(groupId, qpTask, request);
     } else {
-      String groupId = getGroupIdBySG(storageGroup);
-      NonQueryRequest request = new NonQueryRequest(groupId, plan);
-      PeerId leader = RaftUtils.getTargetPeerID(groupId);
+      return asyncHandleTask(qpTask, leader, 0);
+    }
+  }
 
-      SingleTask task = new SingleTask(false, request);
-      return asyncHandleTask(task, leader, 0);
+  /**
+   * Handle data group request locally.
+   */
+  private boolean handleDataGroupRequestLocally(String groupId, QPTask qpTask, 
BasicRequest request)
+      throws InterruptedException {
+    final byte[] reqContext = new byte[4];
+    Task task = null;
+    /** Apply qpTask to Raft Node **/
+    try {
+      task.setData(ByteBuffer
+          .wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
+              .serialize(reqContext)));
+    } catch (final CodecException e) {
+      return false;
     }
+    DataPartitionRaftHolder dataRaftHolder = (DataPartitionRaftHolder) server
+        .getDataPartitionHolderMap().get(groupId);
+    RaftService service = (RaftService) dataRaftHolder.getService();
+    service.getNode().apply(task);
+    qpTask.await();
+    DataNonQueryResponse response = (DataNonQueryResponse) 
qpTask.getResponse();
+    return response.isSuccess();
   }
 
   /**
-   * Async handle task by task and leader id.
+   * Async handle task by QPTask and leader id.
    *
-   * @param task request task
+   * @param task request QPTask
    * @param leader leader of the target raft group
-   * @param taskRetryNum Number of task retries due to timeout and redirected.
+   * @param taskRetryNum Number of QPTask retries due to timeout and 
redirected.
    * @return request result
    */
-  private boolean asyncHandleTask(Task task, PeerId leader, int taskRetryNum)
+  private boolean asyncHandleTask(QPTask task, PeerId leader, int taskRetryNum)
       throws RaftConnectionException, InterruptedException {
     BasicResponse response = asyncHandleTaskGetRes(task, leader, taskRetryNum);
     return response.isSuccess();
@@ -215,10 +247,11 @@ public class NonQueryExecutor extends ClusterQPExecutor {
    */
   public boolean redirectMetadataGroupLeader(PhysicalPlan plan)
       throws IOException, RaftConnectionException, InterruptedException {
-    NonQueryRequest request = new 
NonQueryRequest(CLUSTER_CONFIG.METADATA_GROUP_ID, plan);
+    MetadataNonQueryRequest request = new 
MetadataNonQueryRequest(CLUSTER_CONFIG.METADATA_GROUP_ID,
+        plan);
     PeerId leader = 
RaftUtils.getTargetPeerID(CLUSTER_CONFIG.METADATA_GROUP_ID);
 
-    SingleTask task = new SingleTask(false, request);
+    SingleQPTask task = new SingleQPTask(false, request);
     return asyncHandleTask(task, leader, 0);
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 4560973..7967536 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -26,10 +26,8 @@ import 
com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
 import com.alipay.sofa.jraft.util.Bits;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.iotdb.cluster.callback.SingleTask;
+import org.apache.iotdb.cluster.callback.SingleQPTask;
 import org.apache.iotdb.cluster.config.ClusterConfig;
-import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
@@ -53,9 +51,6 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryMetadataExecutor.class);
 
-  private final AtomicInteger requestId = new AtomicInteger(0);
-  private final Server server = Server.getInstance();
-
   public QueryMetadataExecutor() {
 
   }
@@ -76,10 +71,12 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
     String groupId = getGroupIdBySG(storageGroup);
     QueryTimeSeriesRequest request = new QueryTimeSeriesRequest(groupId, path);
     PeerId leader = RaftUtils.getRandomPeerID(groupId);
-    SingleTask task = new SingleTask(false, request);
+    SingleQPTask task = new SingleQPTask(false, request);
 
+    LOGGER.info("Execute show timeseries {} statement.", path);
     /** Check if the plan can be executed locally. **/
     if (canHandleQuery(storageGroup)) {
+      LOGGER.info("Execute show timeseries {} statement locally.", path);
       return queryTimeSeriesLocally(path, groupId, task);
     } else {
       try {
@@ -96,8 +93,8 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
    *
    * @param path column path
    */
-  private List<List<String>> queryTimeSeriesLocally(String path, String 
groupId, SingleTask task)
-      throws InterruptedException {
+  private List<List<String>> queryTimeSeriesLocally(String path, String 
groupId, SingleQPTask task)
+      throws InterruptedException, ProcessorException {
     final byte[] reqContext = new byte[4];
     Bits.putInt(reqContext, 0, requestId.incrementAndGet());
     DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder) 
server
@@ -110,22 +107,32 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
             QueryTimeSeriesResponse response;
             if (status.isOk()) {
               try {
+                LOGGER.info("start to read");
                 response = new QueryTimeSeriesResponse(false, true,
                     dataPartitionHolder.getFsm().getShowTimeseriesPath(path));
               } catch (final PathErrorException e) {
                 response = new QueryTimeSeriesResponse(false, false, null, 
e.toString());
               }
             } else {
+
+              System.out.println("false");
               response = new QueryTimeSeriesResponse(false, false, null, null);
             }
+            System.out.println(status.isOk());
+            System.out.println();
             task.run(response);
           }
         });
     task.await();
+    QueryTimeSeriesResponse response = (QueryTimeSeriesResponse) 
task.getResponse();
+    if (!response.isSuccess()) {
+      LOGGER.error("Execute show timeseries {} statement false.", path);
+      throw new ProcessorException();
+    }
     return ((QueryTimeSeriesResponse) task.getResponse()).getTimeSeries();
   }
 
-  private List<List<String>> queryTimeSeries(SingleTask task, PeerId leader)
+  private List<List<String>> queryTimeSeries(SingleQPTask task, PeerId leader)
       throws InterruptedException, RaftConnectionException {
     BasicResponse response = asyncHandleTaskGetRes(task, leader, 0);
     return ((QueryTimeSeriesResponse) response).getTimeSeries();
@@ -137,10 +144,11 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
    * @return Set of storage group name
    */
   private Set<String> queryStorageGroupLocally() throws InterruptedException {
-    QueryStorageGroupRequest request = new 
QueryStorageGroupRequest(ClusterConfig.METADATA_GROUP_ID);
-    SingleTask task = new SingleTask(false, request);
     final byte[] reqContext = new byte[4];
     Bits.putInt(reqContext, 0, requestId.incrementAndGet());
+    QueryStorageGroupRequest request = new QueryStorageGroupRequest(
+        ClusterConfig.METADATA_GROUP_ID);
+    SingleQPTask task = new SingleQPTask(false, request);
     MetadataRaftHolder metadataHolder = (MetadataRaftHolder) 
server.getMetadataHolder();
     ((RaftService) metadataHolder.getService()).getNode()
         .readIndex(reqContext, new ReadIndexClosure() {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/NodeAsClient.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/NodeAsClient.java
index d658c94..8d37661 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/NodeAsClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/NodeAsClient.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.cluster.rpc;
 
-import org.apache.iotdb.cluster.callback.Task;
+import org.apache.iotdb.cluster.callback.QPTask;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.rpc.request.BasicRequest;
 
@@ -32,18 +32,18 @@ public interface NodeAsClient {
    *
    * @param clientService client rpc service handle
    * @param leader leader node of the target group
-   * @param task the task to be executed
+   * @param QPTask the QPTask to be executed
    */
   void asyncHandleRequest(Object clientService, BasicRequest request, Object 
leader,
-      Task task) throws RaftConnectionException;
+      QPTask QPTask) throws RaftConnectionException;
 
   /**
    * Synchronous processing requests
    *
    * @param clientService client rpc service handle
    * @param leader leader node of the target group
-   * @param task the task to be executed
+   * @param QPTask the QPTask to be executed
    */
-  void syncHandleRequest(Object clientService, BasicRequest request, Object 
leader, Task task)
+  void syncHandleRequest(Object clientService, BasicRequest request, Object 
leader, QPTask QPTask)
       throws RaftConnectionException;
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/impl/RaftNodeAsClient.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/impl/RaftNodeAsClient.java
index a9cdd90..ea52c83 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/impl/RaftNodeAsClient.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/impl/RaftNodeAsClient.java
@@ -23,8 +23,8 @@ import com.alipay.remoting.exception.RemotingException;
 import com.alipay.sofa.jraft.entity.PeerId;
 import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
 import java.util.concurrent.Executor;
-import org.apache.iotdb.cluster.callback.Task;
-import org.apache.iotdb.cluster.callback.Task.TaskState;
+import org.apache.iotdb.cluster.callback.QPTask;
+import org.apache.iotdb.cluster.callback.QPTask.TaskState;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
@@ -51,7 +51,7 @@ public class RaftNodeAsClient implements NodeAsClient {
 
   @Override
   public void asyncHandleRequest(Object clientService, BasicRequest request, 
Object leader,
-      Task task)
+      QPTask QPTask)
       throws RaftConnectionException {
     BoltCliClientService boltClientService = (BoltCliClientService) 
clientService;
     PeerId raftLeader = (PeerId) leader;
@@ -64,14 +64,14 @@ public class RaftNodeAsClient implements NodeAsClient {
                 @Override
                 public void onResponse(Object result) {
                   BasicResponse response = (BasicResponse) result;
-                  task.run(response);
+                  QPTask.run(response);
                 }
 
                 @Override
                 public void onException(Throwable e) {
                   LOGGER.error("Bolt rpc client occurs errors when handling 
Request", e);
-                  task.setTaskState(TaskState.EXCEPTION);
-                  task.run(null);
+                  QPTask.setTaskState(TaskState.EXCEPTION);
+                  QPTask.run(null);
 
                 }
 
@@ -88,14 +88,14 @@ public class RaftNodeAsClient implements NodeAsClient {
 
   @Override
   public void syncHandleRequest(Object clientService, BasicRequest request, 
Object leader,
-      Task task)
+      QPTask QPTask)
       throws RaftConnectionException {
     BoltCliClientService boltClientService = (BoltCliClientService) 
clientService;
     PeerId raftLeader = (PeerId) leader;
     try {
       BasicResponse response = (BasicResponse) boltClientService.getRpcClient()
           .invokeSync(raftLeader.getEndpoint().toString(), request, 
TASK_TIMEOUT_MS);
-      task.run(response);
+      QPTask.run(response);
     } catch (RemotingException | InterruptedException e) {
       throw new RaftConnectionException(e);
     }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/NonQueryAsyncProcessor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataNonQueryAsyncProcessor.java
similarity index 54%
copy from 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/NonQueryAsyncProcessor.java
copy to 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataNonQueryAsyncProcessor.java
index 202c2d3..db734f4 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/NonQueryAsyncProcessor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataNonQueryAsyncProcessor.java
@@ -30,93 +30,71 @@ import 
com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
 import java.nio.ByteBuffer;
 import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
-import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.request.NonQueryRequest;
-import org.apache.iotdb.cluster.rpc.response.NonQueryResponse;
+import org.apache.iotdb.cluster.rpc.request.DataNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.response.DataNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.response.MetadataNonQueryResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Async handle change metadata request.
+ * Async handle those requests which need to be applied in data group.
  */
-public class NonQueryAsyncProcessor extends 
BasicAsyncUserProcessor<NonQueryRequest> {
+public class DataNonQueryAsyncProcessor extends 
BasicAsyncUserProcessor<DataNonQueryRequest> {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(NonQueryAsyncProcessor.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataNonQueryAsyncProcessor.class);
   private Server server;
 
-  public NonQueryAsyncProcessor(Server server) {
+  public DataNonQueryAsyncProcessor(Server server) {
     this.server = server;
   }
 
   @Override
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
-      NonQueryRequest nonQueryRequest) {
-    Operator.OperatorType requestType = nonQueryRequest.getRequestType();
-    LOGGER.info("Handle nonquery request.");
-    /** Check if it's the leader of metadata **/
-    String groupId = nonQueryRequest.getGroupID();
-    if (!this.server.getServerId().equals(RaftUtils.getTargetPeerID(groupId))) 
{
+      DataNonQueryRequest dataNonQueryRequest) {
+    LOGGER.info("Handle data non query request.");
+
+    /** Check if it's the leader **/
+    String groupId = dataNonQueryRequest.getGroupID();
+    DataPartitionRaftHolder dataPartitionRaftHolder = 
(DataPartitionRaftHolder) server
+        .getDataPartitionHolderMap().get(groupId);
+    if (!dataPartitionRaftHolder.getFsm().isLeader()) {
       PeerId leader = RaftUtils.getTargetPeerID(groupId);
       LOGGER.info("Request need to redirect leader: {}, groupId : {} ", 
leader, groupId);
       BoltCliClientService cliClientService = new BoltCliClientService();
       cliClientService.init(new CliOptions());
       LOGGER.info("Right leader is: {}, group id = {} ", leader, groupId);
-      NonQueryResponse response = new NonQueryResponse(true, false, 
leader.toString(), null);
+      MetadataNonQueryResponse response = new MetadataNonQueryResponse(true, 
false,
+          leader.toString(), null);
       asyncContext.sendResponse(response);
     } else {
 
       LOGGER.info("Apply task to raft node");
-      /** Apply Task to Raft Node **/
+      /** Apply QPTask to Raft Node **/
       final Task task = new Task();
       task.setDone((Status status) -> {
         asyncContext.sendResponse(
-            new NonQueryResponse(false, status.isOk(), null, 
status.getErrorMsg()));
+            new DataNonQueryResponse(false, status.isOk(), null, 
status.getErrorMsg()));
       });
       try {
         task.setData(ByteBuffer
             .wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
-                .serialize(nonQueryRequest)));
+                .serialize(dataNonQueryRequest)));
       } catch (final CodecException e) {
-        asyncContext.sendResponse(new NonQueryResponse(false, false, null, 
e.toString()));
-      }
-
-      RaftService service;
-      switch (requestType) {
-        case SET_STORAGE_GROUP:
-        case AUTHOR:
-        case CREATE_USER:
-        case CREATE_ROLE:
-        case DELETE_ROLE:
-        case DELETE_USER:
-        case GRANT_USER_ROLE:
-        case GRANT_USER_PRIVILEGE:
-        case REVOKE_USER_PRIVILEGE:
-        case REVOKE_USER_ROLE:
-        case GRANT_ROLE_PRIVILEGE:
-        case LIST_USER:
-        case LIST_ROLE:
-        case LIST_USER_PRIVILEGE:
-        case LIST_ROLE_PRIVILEGE:
-        case LIST_USER_ROLES:
-        case LIST_ROLE_USERS:
-        case MODIFY_PASSWORD:
-          MetadataRaftHolder metadataHolder = (MetadataRaftHolder) 
server.getMetadataHolder();
-          service = (RaftService) metadataHolder.getService();
-          break;
-        default:
-          DataPartitionRaftHolder dataRaftHolder = (DataPartitionRaftHolder) 
server
-              .getDataPartitionHolderMap().get(groupId);
-          service = (RaftService) dataRaftHolder.getService();
+        asyncContext.sendResponse(new MetadataNonQueryResponse(false, false, 
null, e.toString()));
       }
+      DataPartitionRaftHolder dataRaftHolder = (DataPartitionRaftHolder) server
+          .getDataPartitionHolderMap().get(groupId);
+      RaftService service = (RaftService) dataRaftHolder.getService();
       service.getNode().apply(task);
     }
   }
 
   @Override
   public String interest() {
-    return NonQueryRequest.class.getName();
+    return MetadataNonQueryRequest.class.getName();
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/NonQueryAsyncProcessor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetadataNonQueryAsyncProcessor.java
similarity index 53%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/NonQueryAsyncProcessor.java
rename to 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetadataNonQueryAsyncProcessor.java
index 202c2d3..31e83ce 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/NonQueryAsyncProcessor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetadataNonQueryAsyncProcessor.java
@@ -29,94 +29,68 @@ import com.alipay.sofa.jraft.option.CliOptions;
 import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
 import java.nio.ByteBuffer;
 import org.apache.iotdb.cluster.entity.Server;
-import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.request.NonQueryRequest;
-import org.apache.iotdb.cluster.rpc.response.NonQueryResponse;
+import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.response.MetadataNonQueryResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Async handle change metadata request.
+ * Async handle those requests which need to be applied in metadata group.
  */
-public class NonQueryAsyncProcessor extends 
BasicAsyncUserProcessor<NonQueryRequest> {
+public class MetadataNonQueryAsyncProcessor extends 
BasicAsyncUserProcessor<MetadataNonQueryRequest> {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(NonQueryAsyncProcessor.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MetadataNonQueryAsyncProcessor.class);
   private Server server;
 
-  public NonQueryAsyncProcessor(Server server) {
+  public MetadataNonQueryAsyncProcessor(Server server) {
     this.server = server;
   }
 
   @Override
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
-      NonQueryRequest nonQueryRequest) {
-    Operator.OperatorType requestType = nonQueryRequest.getRequestType();
-    LOGGER.info("Handle nonquery request.");
-    /** Check if it's the leader of metadata **/
-    String groupId = nonQueryRequest.getGroupID();
-    if (!this.server.getServerId().equals(RaftUtils.getTargetPeerID(groupId))) 
{
+      MetadataNonQueryRequest metadataNonQueryRequest) {
+    LOGGER.info("Handle metadata non query query request.");
+
+    /** Check if it's the leader **/
+    String groupId = metadataNonQueryRequest.getGroupID();
+    MetadataRaftHolder metadataHolder = (MetadataRaftHolder) 
server.getMetadataHolder();
+    if (!metadataHolder.getFsm().isLeader()) {
       PeerId leader = RaftUtils.getTargetPeerID(groupId);
       LOGGER.info("Request need to redirect leader: {}, groupId : {} ", 
leader, groupId);
       BoltCliClientService cliClientService = new BoltCliClientService();
       cliClientService.init(new CliOptions());
       LOGGER.info("Right leader is: {}, group id = {} ", leader, groupId);
-      NonQueryResponse response = new NonQueryResponse(true, false, 
leader.toString(), null);
+      MetadataNonQueryResponse response = new MetadataNonQueryResponse(true, 
false,
+          leader.toString(), null);
       asyncContext.sendResponse(response);
     } else {
 
-      LOGGER.info("Apply task to raft node");
-      /** Apply Task to Raft Node **/
+      LOGGER.info("Apply task to metadata raft node");
+      /** Apply QPTask to Raft Node **/
       final Task task = new Task();
       task.setDone((Status status) -> {
         asyncContext.sendResponse(
-            new NonQueryResponse(false, status.isOk(), null, 
status.getErrorMsg()));
+            new MetadataNonQueryResponse(false, status.isOk(), null, 
status.getErrorMsg()));
       });
       try {
         task.setData(ByteBuffer
             .wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
-                .serialize(nonQueryRequest)));
+                .serialize(metadataNonQueryRequest)));
       } catch (final CodecException e) {
-        asyncContext.sendResponse(new NonQueryResponse(false, false, null, 
e.toString()));
+        asyncContext.sendResponse(new MetadataNonQueryResponse(false, false, 
null, e.toString()));
       }
 
-      RaftService service;
-      switch (requestType) {
-        case SET_STORAGE_GROUP:
-        case AUTHOR:
-        case CREATE_USER:
-        case CREATE_ROLE:
-        case DELETE_ROLE:
-        case DELETE_USER:
-        case GRANT_USER_ROLE:
-        case GRANT_USER_PRIVILEGE:
-        case REVOKE_USER_PRIVILEGE:
-        case REVOKE_USER_ROLE:
-        case GRANT_ROLE_PRIVILEGE:
-        case LIST_USER:
-        case LIST_ROLE:
-        case LIST_USER_PRIVILEGE:
-        case LIST_ROLE_PRIVILEGE:
-        case LIST_USER_ROLES:
-        case LIST_ROLE_USERS:
-        case MODIFY_PASSWORD:
-          MetadataRaftHolder metadataHolder = (MetadataRaftHolder) 
server.getMetadataHolder();
-          service = (RaftService) metadataHolder.getService();
-          break;
-        default:
-          DataPartitionRaftHolder dataRaftHolder = (DataPartitionRaftHolder) 
server
-              .getDataPartitionHolderMap().get(groupId);
-          service = (RaftService) dataRaftHolder.getService();
-      }
+      RaftService service = (RaftService) metadataHolder.getService();
       service.getNode().apply(task);
     }
   }
 
   @Override
   public String interest() {
-    return NonQueryRequest.class.getName();
+    return MetadataNonQueryRequest.class.getName();
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryTimeSeriesAsyncProcessor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryTimeSeriesAsyncProcessor.java
index ba47279..05b9bd9 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryTimeSeriesAsyncProcessor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryTimeSeriesAsyncProcessor.java
@@ -20,28 +20,24 @@ package org.apache.iotdb.cluster.rpc.processor;
 
 import com.alipay.remoting.AsyncContext;
 import com.alipay.remoting.BizContext;
-import com.alipay.remoting.exception.CodecException;
-import com.alipay.remoting.serialization.SerializerManager;
 import com.alipay.sofa.jraft.Status;
-import com.alipay.sofa.jraft.entity.PeerId;
-import com.alipay.sofa.jraft.entity.Task;
-import java.nio.ByteBuffer;
+import com.alipay.sofa.jraft.closure.ReadIndexClosure;
+import com.alipay.sofa.jraft.util.Bits;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.rpc.request.QueryTimeSeriesRequest;
-import org.apache.iotdb.cluster.rpc.response.QueryStorageGroupResponse;
 import org.apache.iotdb.cluster.rpc.response.QueryTimeSeriesResponse;
-import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.db.exception.PathErrorException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class QueryTimeSeriesAsyncProcessor extends 
BasicAsyncUserProcessor<QueryTimeSeriesRequest> {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryTimeSeriesAsyncProcessor.class);
-  private Server server;
   private final AtomicInteger requestId = new AtomicInteger(0);
+  private Server server;
 
   public QueryTimeSeriesAsyncProcessor(Server server) {
     this.server = server;
@@ -50,31 +46,31 @@ public class QueryTimeSeriesAsyncProcessor extends 
BasicAsyncUserProcessor<Query
   @Override
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
       QueryTimeSeriesRequest queryMetadataRequest) {
-    /** Check if it's the leader of data **/
     String groupId = queryMetadataRequest.getGroupID();
-    if (!this.server.getServerId().equals(RaftUtils.getTargetPeerID(groupId))) 
{
-      PeerId leader = RaftUtils.getTargetPeerID(groupId);
-      QueryTimeSeriesResponse response = new QueryTimeSeriesResponse(true, 
false, leader.toString(), null);
-      asyncContext.sendResponse(response);
-    }
-
-    /** Apply Task to Raft Node **/
-    final Task task = new Task();
-    task.setDone((Status status) -> {
-      asyncContext.sendResponse(
-          new QueryTimeSeriesResponse(false, status.isOk(), null, 
status.getErrorMsg()));
-    });
-    try {
-      task.setData(ByteBuffer
-          .wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
-              .serialize(queryMetadataRequest)));
-    } catch (final CodecException e) {
-      asyncContext.sendResponse(new QueryTimeSeriesResponse(false, false, 
null, e.toString()));
-    }
+    final byte[] reqContext = new byte[4];
+    Bits.putInt(reqContext, 0, requestId.incrementAndGet());
+    DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder) 
server
+        .getDataPartitionHolder(groupId);
+    ((RaftService) dataPartitionHolder.getService()).getNode()
+        .readIndex(reqContext, new ReadIndexClosure() {
 
-    DataPartitionRaftHolder dataRaftHolder = (DataPartitionRaftHolder) 
server.getDataPartitionHolderMap().get(groupId);
-    RaftService service = (RaftService) dataRaftHolder.getService();
-    service.getNode().apply(task);
+          @Override
+          public void run(Status status, long index, byte[] reqCtx) {
+            QueryTimeSeriesResponse response;
+            if (status.isOk()) {
+              try {
+                response = new QueryTimeSeriesResponse(false, true,
+                    dataPartitionHolder.getFsm()
+                        
.getShowTimeseriesPath(queryMetadataRequest.getPath()));
+              } catch (final PathErrorException e) {
+                response = new QueryTimeSeriesResponse(false, false, null, 
e.toString());
+              }
+            } else {
+              response = new QueryTimeSeriesResponse(false, false, null, null);
+            }
+            asyncContext.sendResponse(response);
+          }
+        });
   }
 
   @Override
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/NonQueryRequest.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataNonQueryRequest.java
similarity index 80%
copy from 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/NonQueryRequest.java
copy to 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataNonQueryRequest.java
index e4d25d5..c5fb573 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/NonQueryRequest.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataNonQueryRequest.java
@@ -24,29 +24,24 @@ import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
 
-public class NonQueryRequest extends BasicRequest implements Serializable {
+/**
+ * Handle request to data group
+ */
+public class DataNonQueryRequest extends BasicRequest implements Serializable {
 
   /**
    * Serialized physical plan
    */
   private byte[] physicalPlanBytes;
-  /**
-   * Request type
-   */
-  private Operator.OperatorType requestType;
 
-  public NonQueryRequest(String groupID, PhysicalPlan plan)
+  public DataNonQueryRequest(String groupID, PhysicalPlan plan)
       throws IOException {
     super(groupID);
     this.physicalPlanBytes = PhysicalPlanLogTransfer.operatorToLog(plan);
-    this.requestType = plan.getOperatorType();
   }
 
   public byte[] getPhysicalPlanBytes() {
     return physicalPlanBytes;
   }
 
-  public Operator.OperatorType getRequestType() {
-    return requestType;
-  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/NonQueryRequest.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetadataNonQueryRequest.java
similarity index 80%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/NonQueryRequest.java
rename to 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetadataNonQueryRequest.java
index e4d25d5..dd91e09 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/NonQueryRequest.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetadataNonQueryRequest.java
@@ -24,29 +24,24 @@ import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
 
-public class NonQueryRequest extends BasicRequest implements Serializable {
+/**
+ * Handle request to metadata group leader
+ */
+public class MetadataNonQueryRequest extends BasicRequest implements 
Serializable {
 
   /**
    * Serialized physical plan
    */
   private byte[] physicalPlanBytes;
-  /**
-   * Request type
-   */
-  private Operator.OperatorType requestType;
 
-  public NonQueryRequest(String groupID, PhysicalPlan plan)
+  public MetadataNonQueryRequest(String groupID, PhysicalPlan plan)
       throws IOException {
     super(groupID);
     this.physicalPlanBytes = PhysicalPlanLogTransfer.operatorToLog(plan);
-    this.requestType = plan.getOperatorType();
   }
 
   public byte[] getPhysicalPlanBytes() {
     return physicalPlanBytes;
   }
 
-  public Operator.OperatorType getRequestType() {
-    return requestType;
-  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/NonQueryResponse.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataNonQueryResponse.java
similarity index 77%
copy from 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/NonQueryResponse.java
copy to 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataNonQueryResponse.java
index 7e466fb..6f430d0 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/NonQueryResponse.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataNonQueryResponse.java
@@ -18,13 +18,16 @@
  */
 package org.apache.iotdb.cluster.rpc.response;
 
-public class NonQueryResponse extends BasicResponse {
+/**
+ * Handle response from data group leader
+ */
+public class DataNonQueryResponse extends BasicResponse {
 
-  public NonQueryResponse(boolean redirected, boolean success, String 
leaderStr, String errorMsg) {
+  public DataNonQueryResponse(boolean redirected, boolean success, String 
leaderStr, String errorMsg) {
     super(redirected, success, leaderStr, errorMsg);
   }
 
-  public NonQueryResponse(boolean redirected, boolean success) {
+  public DataNonQueryResponse(boolean redirected, boolean success) {
     super(redirected, success, null, null);
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/NonQueryResponse.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetadataNonQueryResponse.java
similarity index 76%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/NonQueryResponse.java
rename to 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetadataNonQueryResponse.java
index 7e466fb..f23fa1c 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/NonQueryResponse.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetadataNonQueryResponse.java
@@ -18,13 +18,16 @@
  */
 package org.apache.iotdb.cluster.rpc.response;
 
-public class NonQueryResponse extends BasicResponse {
+/**
+ * Handle response from metadata group leader
+ */
+public class MetadataNonQueryResponse extends BasicResponse {
 
-  public NonQueryResponse(boolean redirected, boolean success, String 
leaderStr, String errorMsg) {
+  public MetadataNonQueryResponse(boolean redirected, boolean success, String 
leaderStr, String errorMsg) {
     super(redirected, success, leaderStr, errorMsg);
   }
 
-  public NonQueryResponse(boolean redirected, boolean success) {
+  public MetadataNonQueryResponse(boolean redirected, boolean success) {
     super(redirected, success, null, null);
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 984bfb6..da42cb7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -171,7 +171,7 @@ public class RaftUtils {
   public static PeerId[] convertPhysicalNodeArrayToPeerIdArray(PhysicalNode[] 
physicalNodes) {
     PeerId[] peerIds = new PeerId[physicalNodes.length];
     for (int i = 0; i < physicalNodes.length; i++) {
-      peerIds[i] = new PeerId(physicalNodes[i].getIp(), 
physicalNodes[i].getPort());
+      peerIds[i] = convertPhysicalNode(physicalNodes[i]);
     }
     return peerIds;
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
index 9326610..3dab29f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
@@ -163,7 +163,7 @@ public class Router {
   }
 
   /**
-   * For a storage group, compute the nearest physical node on the hash ring 
Only for test
+   * For a storage group, compute the nearest physical node on the hash ring
    */
   public PhysicalNode routeNode(String objectKey) {
     int hashVal = hashFunction.hash(objectKey);
@@ -213,7 +213,7 @@ public class Router {
    * Show physical nodes by group id.
    */
   public void showPhysicalNodes(String groupId) {
-    PhysicalNode[] physicalPlans = Router.getInstance().routeGroup(groupId);
+    PhysicalNode[] physicalPlans = getNodesByGroupId(groupId);
     for (PhysicalNode node : physicalPlans) {
       System.out.println(node);
     }
diff --git a/service-rpc/src/main/thrift/rpc.thrift 
b/service-rpc/src/main/thrift/rpc.thrift
index c603ae0..9604bf8 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -52,7 +52,7 @@ struct TSHandleIdentifier {
   2: required binary secret,
 }
 
-// Client-side reference to a task running asynchronously on the server.
+// Client-side reference to a QPTask running asynchronously on the server.
 struct TSOperationHandle {
   1: required TSHandleIdentifier operationId
 

Reply via email to