This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 15c53c3  update
15c53c3 is described below

commit 15c53c38c9dea4cff0ca1beb60cded795d4f3ef3
Author: mdf369 <[email protected]>
AuthorDate: Fri Mar 29 14:46:19 2019 +0800

    update
---
 .../cluster/qp/executor/QueryMetadataExecutor.java | 76 ++++++++++++----------
 .../rpc/processor/QueryMetadataAsyncProcessor.java | 26 +-------
 .../cluster/rpc/service/TSServiceClusterImpl.java  |  4 +-
 3 files changed, 43 insertions(+), 63 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 3ca9348..b55bfab 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -18,22 +18,25 @@
  */
 package org.apache.iotdb.cluster.qp.executor;
 
-import com.alipay.remoting.InvokeCallback;
-import com.alipay.remoting.exception.RemotingException;
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import com.alipay.sofa.jraft.entity.PeerId;
 import com.alipay.sofa.jraft.option.CliOptions;
 import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
+import com.alipay.sofa.jraft.util.Bits;
 import java.util.Set;
-import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iotdb.cluster.callback.SingleTask;
-import org.apache.iotdb.cluster.callback.Task;
 import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
+import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
 import org.apache.iotdb.cluster.rpc.MetadataType;
 import org.apache.iotdb.cluster.rpc.request.QueryMetadataRequest;
-import org.apache.iotdb.cluster.rpc.response.BasicResponse;
 import org.apache.iotdb.cluster.rpc.response.QueryMetadataResponse;
+import org.apache.iotdb.db.exception.PathErrorException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,55 +47,56 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryMetadataExecutor.class);
 
+  private final AtomicInteger requestId = new AtomicInteger(0);
+  private final Server server = Server.getInstance();
+
   public QueryMetadataExecutor() {
 
   }
 
-  public void init(){
+  public void init() {
     this.cliClientService = new BoltCliClientService();
     this.cliClientService.init(new CliOptions());
     SUB_TASK_NUM = 1;
   }
 
   public Set<String> processMetadataQuery(MetadataType type)
-      throws RaftConnectionException, InterruptedException {
+      throws InterruptedException {
     QueryMetadataRequest request = new QueryMetadataRequest(
         ClusterConfig.METADATA_GROUP_ID, type);
     SingleTask task = new SingleTask(false, request);
     return asyncHandleTaskLocally(task);
   }
 
-  private Set<String> asyncHandleTaskLocally(SingleTask task)
-      throws RaftConnectionException, InterruptedException {
-    try {
-      cliClientService.getRpcClient()
-          .invokeWithCallback(localNode.toString(), task.getRequest(),
-              new InvokeCallback() {
-
-                @Override
-                public void onResponse(Object result) {
-                  BasicResponse response = (BasicResponse) result;
-                  task.run(response);
-                }
-
-                @Override
-                public void onException(Throwable e) {
-                  LOGGER.error("Bolt rpc client occurs errors when handling 
Request", e);
-                  task.setTaskState(TaskState.EXCEPTION);
-                  task.run(null);
-
-                }
-
-                @Override
-                public Executor getExecutor() {
-                  return null;
-                }
-              }, CLUSTER_CONFIG.getTaskTimeoutMs());
-    } catch (RemotingException | InterruptedException e) {
-      throw new RaftConnectionException(e);
-    }
+  private Set<String> asyncHandleTaskLocally(SingleTask task) throws 
InterruptedException {
+    readIndexForSG(task);
 
     task.await();
     return ((QueryMetadataResponse) task.getResponse()).getMetadataSet();
   }
+
+  private void readIndexForSG(SingleTask task) {
+    final byte[] reqContext = new byte[4];
+    Bits.putInt(reqContext, 0, requestId.incrementAndGet());
+    MetadataRaftHolder metadataHolder = (MetadataRaftHolder) 
server.getMetadataHolder();
+    ((RaftService) metadataHolder.getService()).getNode()
+        .readIndex(reqContext, new ReadIndexClosure() {
+
+          @Override
+          public void run(Status status, long index, byte[] reqCtx) {
+            QueryMetadataResponse response = null;
+            if (status.isOk()) {
+              try {
+                response = new QueryMetadataResponse(false, true,
+                    metadataHolder.getFsm().getAllStorageGroups());
+              } catch (final PathErrorException e) {
+                response = new QueryMetadataResponse(false, false, null, 
e.toString());
+              }
+            } else {
+              response = new QueryMetadataResponse(false, false, null, null);
+            }
+            task.run(response);
+          }
+        });
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
index 69d89a2..bf8b159 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
@@ -51,7 +51,7 @@ public class QueryMetadataAsyncProcessor extends 
BasicAsyncUserProcessor<QueryMe
       QueryMetadataRequest queryMetadataRequest) {
     MetadataType metadataType = queryMetadataRequest.getMetadataType();
     if (metadataType == MetadataType.STORAGE_GROUP) {
-      readIndexForSG(asyncContext);
+      // TODO
     } else {
       //TODO deal with query time series
       QueryMetadataResponse response = new QueryMetadataResponse(false, false, 
null, null);
@@ -59,30 +59,6 @@ public class QueryMetadataAsyncProcessor extends 
BasicAsyncUserProcessor<QueryMe
     }
   }
 
-  private void readIndexForSG(AsyncContext asyncContext) {
-    final byte[] reqContext = new byte[4];
-    Bits.putInt(reqContext, 0, requestId.incrementAndGet());
-    MetadataRaftHolder metadataHolder = (MetadataRaftHolder) 
server.getMetadataHolder();
-    ((RaftService) metadataHolder.getService()).getNode()
-        .readIndex(reqContext, new ReadIndexClosure() {
-
-          @Override
-          public void run(Status status, long index, byte[] reqCtx) {
-            if (status.isOk()) {
-              try {
-                asyncContext.sendResponse(new QueryMetadataResponse(false, 
true,
-                    metadataHolder.getFsm().getAllStorageGroups()));
-              } catch (final PathErrorException e) {
-                asyncContext
-                    .sendResponse(new QueryMetadataResponse(false, false, 
null, e.toString()));
-              }
-            } else {
-              asyncContext.sendResponse(new QueryMetadataResponse(false, 
false, null, null));
-            }
-          }
-        });
-  }
-
   @Override
   public String interest() {
     return QueryMetadataRequest.class.getName();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
index 50b685a..e7f25e2 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
@@ -119,7 +119,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
         try {
           Set<String> storageGroups = 
processMetadataQuery(MetadataType.STORAGE_GROUP);
           resp.setShowStorageGroups(storageGroups);
-        } catch (RaftConnectionException | InterruptedException e) {
+        } catch (InterruptedException e) {
           status = getErrorStatus(
               String.format("Failed to fetch storage groups' metadata because: 
%s", e));
           resp.setStatus(status);
@@ -145,7 +145,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
   }
 
   public Set<String> processMetadataQuery(MetadataType type)
-      throws RaftConnectionException, InterruptedException {
+      throws InterruptedException {
     return queryMetadataExecutor.get().processMetadataQuery(type);
   }
 }

Reply via email to