This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 3059d5f unit code and add judge leader process of
QueryMetadataAsyncProcessor
3059d5f is described below
commit 3059d5f5194cca8deedb2a1b501a0bebadf66d8d
Author: lta <[email protected]>
AuthorDate: Fri Mar 29 11:56:14 2019 +0800
unit code and add judge leader process of QueryMetadataAsyncProcessor
---
.../apache/iotdb/cluster/callback/SingleTask.java | 5 +--
.../org/apache/iotdb/cluster/callback/Task.java | 29 +++++++++--------
.../apache/iotdb/cluster/qp/ClusterQPExecutor.java | 36 ++++++++++++++++++++++
.../cluster/qp/executor/NonQueryExecutor.java | 20 ++----------
.../cluster/qp/executor/QueryMetadataExecutor.java | 15 ++-------
.../rpc/processor/QueryMetadataAsyncProcessor.java | 10 ++++++
6 files changed, 68 insertions(+), 47 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
index 2fd49b2..ac5492c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
@@ -26,9 +26,10 @@ import org.apache.iotdb.cluster.rpc.response.BasicResponse;
*/
public class SingleTask extends Task {
+ private static final int TASK_NUM = 1;
public SingleTask(boolean isSyncTask, BasicRequest request) {
- super(isSyncTask, 1, TaskState.INITIAL);
+ super(isSyncTask, TASK_NUM, TaskState.INITIAL);
this.request = request;
}
@@ -43,6 +44,6 @@ public class SingleTask extends Task {
} else if (taskState != TaskState.EXCEPTION) {
this.taskState = TaskState.FINISH;
}
- this.taskNum.countDown();
+ this.taskCountDownLatch.countDown();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
index 8c825ac..0c73a01 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
@@ -39,21 +39,24 @@ public abstract class Task {
protected boolean isSyncTask;
/**
+ * Count down latch for sub-tasks
+ */
+ protected CountDownLatch taskCountDownLatch;
+
+ /**
* Num of sub-task
*/
- protected CountDownLatch taskNum;
+ private int taskNum;
+
/**
* Describe task type
*/
protected TaskState taskState;
- /**
- * Default task num
- */
- private static final int DEFAULT_TASK_NUM = 1;
public Task(boolean isSyncTask, int taskNum, TaskState taskState) {
this.isSyncTask = isSyncTask;
- this.taskNum = new CountDownLatch(taskNum);
+ this.taskNum = taskNum;
+ this.taskCountDownLatch = new CountDownLatch(taskNum);
this.taskState = taskState;
}
@@ -72,16 +75,12 @@ public abstract class Task {
isSyncTask = syncTask;
}
- public CountDownLatch getTaskNum() {
- return taskNum;
- }
-
- public void setTaskNum(int taskNum) {
- this.taskNum = new CountDownLatch(taskNum);
+ public CountDownLatch getTaskCountDownLatch() {
+ return taskCountDownLatch;
}
- public void setTaskNum() {
- this.taskNum = new CountDownLatch(DEFAULT_TASK_NUM);
+ public void resetTask() {
+ this.taskCountDownLatch = new CountDownLatch(taskNum);
}
public TaskState getTaskState() {
@@ -114,6 +113,6 @@ public abstract class Task {
}
public void await() throws InterruptedException {
- this.taskNum.await();
+ this.taskCountDownLatch.await();
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index 11fda06..d281a2a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -18,9 +18,16 @@
*/
package org.apache.iotdb.cluster.qp;
+import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
+import org.apache.iotdb.cluster.callback.Task;
+import org.apache.iotdb.cluster.callback.Task.TaskState;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.rpc.NodeAsClient;
+import org.apache.iotdb.cluster.rpc.impl.RaftNodeAsClient;
+import org.apache.iotdb.cluster.rpc.response.BasicResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
import org.apache.iotdb.cluster.utils.hash.Router;
@@ -84,6 +91,35 @@ public abstract class ClusterQPExecutor {
return false;
}
+ /**
+ * Async handle task by task and leader id
+ *
+ * @param task request task
+ * @param leader leader of the target raft group
+ * @param taskRetryNum Number of task retries due to timeout and redirected.
+ * @return basic response
+ */
+ public BasicResponse asyncHandleTaskGetRes(Task task, PeerId leader, int
taskRetryNum)
+ throws RaftConnectionException, InterruptedException {
+ if (taskRetryNum >= TASK_MAX_RETRY) {
+ throw new RaftConnectionException(String.format("Task retries reach the
upper bound %s",
+ TASK_MAX_RETRY));
+ }
+ NodeAsClient client = new RaftNodeAsClient();
+ /** Call async method **/
+ client.asyncHandleRequest(cliClientService, task.getRequest(), leader,
task);
+ task.await();
+ if (task.getTaskState() != TaskState.FINISH) {
+ if (task.getTaskState() == TaskState.REDIRECT) {
+ /** redirect to the right leader **/
+ leader = PeerId.parsePeer(task.getResponse().getLeaderStr());
+ }
+ task.resetTask();
+ return asyncHandleTaskGetRes(task, leader, taskRetryNum + 1);
+ }
+ return task.getResponse();
+ }
+
public void shutdown() {
cliClientService.shutdown();
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index e263a80..88edd0a 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
import org.apache.iotdb.cluster.rpc.NodeAsClient;
import org.apache.iotdb.cluster.rpc.impl.RaftNodeAsClient;
import org.apache.iotdb.cluster.rpc.request.NonQueryRequest;
+import org.apache.iotdb.cluster.rpc.response.BasicResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
@@ -204,23 +205,8 @@ public class NonQueryExecutor extends ClusterQPExecutor {
*/
private boolean asyncHandleTask(Task task, PeerId leader, int taskRetryNum)
throws RaftConnectionException, InterruptedException {
- if (taskRetryNum >= TASK_MAX_RETRY) {
- throw new RaftConnectionException(String.format("Task retries reach the
upper bound %s",
- TASK_MAX_RETRY));
- }
- NodeAsClient client = new RaftNodeAsClient();
- /** Call async method **/
- client.asyncHandleRequest(cliClientService, task.getRequest(), leader,
task);
- task.await();
- if (task.getTaskState() != TaskState.FINISH) {
- if (task.getTaskState() == TaskState.REDIRECT) {
- /** redirect to the right leader **/
- leader = PeerId.parsePeer(task.getResponse().getLeaderStr());
- }
- task.setTaskNum();
- return asyncHandleTask(task, leader, taskRetryNum + 1);
- }
- return task.getResponse().isSuccess();
+ BasicResponse response = asyncHandleTaskGetRes(task, leader, taskRetryNum);
+ return response.isSuccess();
}
public void shutdown() {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index b514cbf..44790e0 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -70,18 +70,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor
{
*/
private Set<String> asyncHandleTask(Task task, PeerId leader, int
taskRetryNum)
throws RaftConnectionException, InterruptedException {
- if (taskRetryNum >= TASK_MAX_RETRY) {
- throw new RaftConnectionException(String.format("Task retries reach the
upper bound %s",
- TASK_MAX_RETRY));
- }
- NodeAsClient client = new RaftNodeAsClient();
- /** Call async method **/
- client.asyncHandleRequest(cliClientService, task.getRequest(), leader,
task);
- task.await();
- if (task.getTaskState() != TaskState.FINISH) {
- task.setTaskNum(SUB_TASK_NUM);
- return asyncHandleTask(task, leader, taskRetryNum + 1);
- }
- return ((QueryMetadataResponse) task.getResponse()).getMetadataSet();
+ QueryMetadataResponse response = (QueryMetadataResponse)
asyncHandleTaskGetRes(task, leader, taskRetryNum);
+ return response.getMetadataSet();
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
index 2320a2e..49f4821 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
@@ -22,6 +22,7 @@ import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
+import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.util.Bits;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.cluster.entity.Server;
@@ -30,6 +31,7 @@ import org.apache.iotdb.cluster.entity.raft.RaftService;
import org.apache.iotdb.cluster.rpc.MetadataType;
import org.apache.iotdb.cluster.rpc.request.QueryMetadataRequest;
import org.apache.iotdb.cluster.rpc.response.QueryMetadataResponse;
+import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +49,14 @@ public class QueryMetadataAsyncProcessor extends
BasicAsyncUserProcessor<QueryMe
@Override
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
QueryMetadataRequest queryMetadataRequest) {
+ String groupId = queryMetadataRequest.getGroupID();
+ if (this.server.getServerId().equals(RaftUtils.getLeader(groupId))) {
+ PeerId leader = RaftUtils.getLeader(groupId);
+ QueryMetadataResponse response = new QueryMetadataResponse(true, false,
leader.toString(),
+ null);
+ asyncContext.sendResponse(response);
+ }
+
MetadataType metadataType = queryMetadataRequest.getMetadataType();
if (metadataType == MetadataType.STORAGE_GROUP) {
readIndexForSG(asyncContext);