This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new cf5a19d  update
cf5a19d is described below

commit cf5a19d826ee648da7f94400e11c8cd605d52669
Author: mdf369 <[email protected]>
AuthorDate: Fri Mar 29 09:20:26 2019 +0800

    update
---
 .../rpc/processor/QueryMetadataAsyncProcessor.java | 75 ++++++++--------------
 1 file changed, 25 insertions(+), 50 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
index dd292ad..2320a2e 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
@@ -20,14 +20,9 @@ package org.apache.iotdb.cluster.rpc.processor;
 
 import com.alipay.remoting.AsyncContext;
 import com.alipay.remoting.BizContext;
-import com.alipay.remoting.exception.CodecException;
-import com.alipay.remoting.serialization.SerializerManager;
 import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.closure.ReadIndexClosure;
-import com.alipay.sofa.jraft.entity.Task;
 import com.alipay.sofa.jraft.util.Bits;
-import java.nio.ByteBuffer;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
@@ -54,51 +49,7 @@ public class QueryMetadataAsyncProcessor extends 
BasicAsyncUserProcessor<QueryMe
       QueryMetadataRequest queryMetadataRequest) {
     MetadataType metadataType = queryMetadataRequest.getMetadataType();
     if (metadataType == MetadataType.STORAGE_GROUP) {
-      MetadataRaftHolder metadataHolder = (MetadataRaftHolder) 
server.getMetadataHolder();
-      /** Verify if it's the leader of metadata **/
-      if (metadataHolder.getFsm().isLeader()) {
-        try {
-          final Task task = new Task();
-          Set<String> storageGroupSet = 
metadataHolder.getFsm().getAllStorageGroups();
-          task.setDone(status -> {
-            if (!status.isOk()) {
-              asyncContext.sendResponse(
-                  new QueryMetadataResponse(false, false, null, 
status.getErrorMsg()));
-            } else {
-              asyncContext.sendResponse(new QueryMetadataResponse(false, true, 
storageGroupSet));
-            }
-          });
-          task.setData(ByteBuffer
-              .wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
-                  .serialize(queryMetadataRequest)));
-          RaftService service = (RaftService) metadataHolder.getService();
-          service.getNode().apply(task);
-        } catch (final CodecException | PathErrorException e) {
-          asyncContext.sendResponse(new QueryMetadataResponse(false, false, 
null, e.toString()));
-        }
-      } else {
-        // TODO readIndex
-        final byte[] reqContext = new byte[4];
-        Bits.putInt(reqContext, 0, requestId.incrementAndGet());
-        ((RaftService) metadataHolder.getService()).getNode()
-            .readIndex(reqContext, new ReadIndexClosure() {
-
-              @Override
-              public void run(Status status, long index, byte[] reqCtx) {
-                if (status.isOk()) {
-                  try {
-                    asyncContext.sendResponse(new QueryMetadataResponse(false, 
true,
-                        metadataHolder.getFsm().getAllStorageGroups()));
-                  } catch (final PathErrorException e) {
-                    asyncContext
-                        .sendResponse(new QueryMetadataResponse(false, false, 
null, e.toString()));
-                  }
-                } else {
-                  asyncContext.sendResponse(new QueryMetadataResponse(false, 
false, null, null));
-                }
-              }
-            });
-      }
+      readIndexForSG(asyncContext);
     } else {
       //TODO deal with query time series
       QueryMetadataResponse response = new QueryMetadataResponse(false, false, 
null, null);
@@ -106,6 +57,30 @@ public class QueryMetadataAsyncProcessor extends 
BasicAsyncUserProcessor<QueryMe
     }
   }
 
+  private void readIndexForSG(AsyncContext asyncContext) {
+    final byte[] reqContext = new byte[4];
+    Bits.putInt(reqContext, 0, requestId.incrementAndGet());
+    MetadataRaftHolder metadataHolder = (MetadataRaftHolder) 
server.getMetadataHolder();
+    ((RaftService) metadataHolder.getService()).getNode()
+        .readIndex(reqContext, new ReadIndexClosure() {
+
+          @Override
+          public void run(Status status, long index, byte[] reqCtx) {
+            if (status.isOk()) {
+              try {
+                asyncContext.sendResponse(new QueryMetadataResponse(false, 
true,
+                    metadataHolder.getFsm().getAllStorageGroups()));
+              } catch (final PathErrorException e) {
+                asyncContext
+                    .sendResponse(new QueryMetadataResponse(false, false, 
null, e.toString()));
+              }
+            } else {
+              asyncContext.sendResponse(new QueryMetadataResponse(false, 
false, null, null));
+            }
+          }
+        });
+  }
+
   @Override
   public String interest() {
     return QueryMetadataRequest.class.getName();

Reply via email to