This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 4d1423d update
4d1423d is described below
commit 4d1423dcbeec6bb713f96be51714a83c73f3454b
Author: mdf369 <[email protected]>
AuthorDate: Fri Mar 29 14:14:31 2019 +0800
update
---
.../cluster/qp/executor/QueryMetadataExecutor.java | 56 +++++++++++++++-------
.../rpc/processor/QueryMetadataAsyncProcessor.java | 8 ----
2 files changed, 40 insertions(+), 24 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 44790e0..e206b65 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -18,10 +18,13 @@
*/
package org.apache.iotdb.cluster.qp.executor;
+import com.alipay.remoting.InvokeCallback;
+import com.alipay.remoting.exception.RemotingException;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
import java.util.Set;
+import java.util.concurrent.Executor;
import org.apache.iotdb.cluster.callback.SingleTask;
import org.apache.iotdb.cluster.callback.Task;
import org.apache.iotdb.cluster.callback.Task.TaskState;
@@ -29,17 +32,19 @@ import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
import org.apache.iotdb.cluster.rpc.MetadataType;
-import org.apache.iotdb.cluster.rpc.NodeAsClient;
-import org.apache.iotdb.cluster.rpc.impl.RaftNodeAsClient;
import org.apache.iotdb.cluster.rpc.request.QueryMetadataRequest;
+import org.apache.iotdb.cluster.rpc.response.BasicResponse;
import org.apache.iotdb.cluster.rpc.response.QueryMetadataResponse;
-import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Handle show all storage group logic
*/
public class QueryMetadataExecutor extends ClusterQPExecutor {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryMetadataExecutor.class);
+
public QueryMetadataExecutor() {
}
@@ -54,23 +59,42 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
throws RaftConnectionException, InterruptedException {
QueryMetadataRequest request = new QueryMetadataRequest(
ClusterConfig.METADATA_GROUP_ID, type);
- PeerId leader = RaftUtils.getLeader(ClusterConfig.METADATA_GROUP_ID);
SingleTask task = new SingleTask(false, request);
- return asyncHandleTask(task, leader, 0);
+ return asyncHandleTaskLocally(task);
}
- /**
- * Async handle task by task and leader id.
- *
- * @param task request task
- * @param leader leader of the target raft group
- * @param taskRetryNum Number of task retries due to timeout and redirected.
- * @return request result
- */
- private Set<String> asyncHandleTask(Task task, PeerId leader, int
taskRetryNum)
+ private Set<String> asyncHandleTaskLocally(SingleTask task)
throws RaftConnectionException, InterruptedException {
- QueryMetadataResponse response = (QueryMetadataResponse)
asyncHandleTaskGetRes(task, leader, taskRetryNum);
- return response.getMetadataSet();
+ try {
+ cliClientService.getRpcClient()
+ .invokeWithCallback(localNode.toString(), task.getRequest(),
+ new InvokeCallback() {
+
+ @Override
+ public void onResponse(Object result) {
+ BasicResponse response = (BasicResponse) result;
+ task.run(response);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ LOGGER.error("Bolt rpc client occurs errors when handling
Request", e);
+ task.setTaskState(TaskState.EXCEPTION);
+ task.run(null);
+
+ }
+
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+ }, CLUSTER_CONFIG.getTaskTimeoutMs());
+ } catch (RemotingException | InterruptedException e) {
+ throw new RaftConnectionException(e);
+ }
+
+ task.await();
+ return ((QueryMetadataResponse) task.getResponse()).getMetadataSet();
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
index 49f4821..69d89a2 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
@@ -49,14 +49,6 @@ public class QueryMetadataAsyncProcessor extends
BasicAsyncUserProcessor<QueryMe
@Override
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
QueryMetadataRequest queryMetadataRequest) {
- String groupId = queryMetadataRequest.getGroupID();
- if (this.server.getServerId().equals(RaftUtils.getLeader(groupId))) {
- PeerId leader = RaftUtils.getLeader(groupId);
- QueryMetadataResponse response = new QueryMetadataResponse(true, false,
leader.toString(),
- null);
- asyncContext.sendResponse(response);
- }
-
MetadataType metadataType = queryMetadataRequest.getMetadataType();
if (metadataType == MetadataType.STORAGE_GROUP) {
readIndexForSG(asyncContext);