This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch cluster_show_sg
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit f804ba323b0554f1de0c7ec41965392ea2348ae7
Author: mdf369 <[email protected]>
AuthorDate: Thu Mar 28 20:22:02 2019 +0800

    implement show storage group
---
 .../cluster/entity/raft/MetadataStateManchine.java |  5 ++
 .../apache/iotdb/cluster/qp/ClusterQPExecutor.java | 22 +++++-
 .../cluster/qp/executor/NonQueryExecutor.java      | 25 +------
 .../cluster/qp/executor/QueryMetadataExecutor.java | 87 ++++++++++++++++++++++
 .../BasicResponse.java => MetadataType.java}       | 31 +-------
 .../rpc/processor/QueryMetadataAsyncProcessor.java | 87 ++++++++++++++++++++++
 .../QueryMetadataRequest.java}                     | 31 +++-----
 .../iotdb/cluster/rpc/response/BasicResponse.java  |  2 +-
 ...sicResponse.java => QueryMetadataResponse.java} | 29 +++-----
 .../cluster/rpc/service/TSServiceClusterImpl.java  | 62 ++++++++++++++-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  4 +-
 11 files changed, 292 insertions(+), 93 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
index 7bfff18..ff3614d 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
@@ -26,6 +26,7 @@ import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.core.StateMachineAdapter;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.cluster.rpc.request.ChangeMetadataRequest;
 import org.apache.iotdb.db.auth.AuthException;
@@ -111,6 +112,10 @@ public class MetadataStateManchine extends 
StateMachineAdapter {
     // TODO implement this method
   }
 
+  public Set<String> getAllStorageGroups() throws PathErrorException {
+    return mManager.getAllStorageGroup();
+  }
+
   public void addUser(String username, String password) throws AuthException {
     authorizer.createUser(username, password);
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index 367bf79..cd144d4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -27,13 +27,29 @@ import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
 import org.apache.iotdb.cluster.utils.hash.Router;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
 
 public abstract class ClusterQPExecutor {
 
-  private static final ClusterConfig CLUSTER_CONFIG = 
ClusterDescriptor.getInstance().getConfig();
+  protected static final ClusterConfig CLUSTER_CONFIG = 
ClusterDescriptor.getInstance().getConfig();
   private Router router = Router.getInstance();
   private PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(), 
CLUSTER_CONFIG.getPort());
 
+  protected OverflowQPExecutor qpExecutor = new OverflowQPExecutor();
+  protected MManager mManager = MManager.getInstance();
+  /**
+   * Rpc Service Client
+   */
+  protected BoltCliClientService cliClientService;
+  /**
+   * Count limit to redo a single task
+   */
+  protected static final int TASK_MAX_RETRY = 
CLUSTER_CONFIG.getTaskRedoCount();
+  /**
+   * Number of subtask in task segmentation
+   */
+  protected static int SUB_TASK_NUM = 1;
+
   /**
    * Get Storage Group Name by device name
    */
@@ -68,4 +84,8 @@ public abstract class ClusterQPExecutor {
     }
     return false;
   }
+
+  public void shutdown(){
+    cliClientService.shutdown();
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 5510a5a..52630bc 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -56,21 +56,6 @@ import org.slf4j.LoggerFactory;
 public class NonQueryExecutor extends ClusterQPExecutor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(NonQueryExecutor.class);
-  private OverflowQPExecutor qpExecutor = new OverflowQPExecutor();
-  private MManager mManager = MManager.getInstance();
-  /**
-   * Rpc Service Client
-   */
-  private BoltCliClientService cliClientService;
-  private static final ClusterConfig CLUSTER_CONFIG = 
ClusterDescriptor.getInstance().getConfig();
-  /**
-   * Count limit to redo a single task
-   */
-  private static final int TASK_MAX_RETRY = CLUSTER_CONFIG.getTaskRedoCount();
-  /**
-   * Number of subtask in task segmentation
-   */
-  private static final int SUB_TASK_NUM = 1;
 
   public NonQueryExecutor() {
 
@@ -79,6 +64,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
   public void init(){
     this.cliClientService = new BoltCliClientService();
     this.cliClientService.init(new CliOptions());
+    SUB_TASK_NUM = 1;
   }
 
   public boolean processNonQuery(PhysicalPlan plan) throws ProcessorException {
@@ -180,9 +166,9 @@ public class NonQueryExecutor extends ClusterQPExecutor {
               String.format("File level %s already exists.", 
path.getFullPath()));
         } else {
           ChangeMetadataRequest request = new ChangeMetadataRequest(
-              CLUSTER_CONFIG.METADATA_GROUP_ID,
+              ClusterConfig.METADATA_GROUP_ID,
               metadataPlan);
-          PeerId leader = 
RaftUtils.getLeader(CLUSTER_CONFIG.METADATA_GROUP_ID, cliClientService);
+          PeerId leader = RaftUtils.getLeader(ClusterConfig.METADATA_GROUP_ID, 
cliClientService);
 
           SingleTask task = new SingleTask(false, request);
           return asyncHandleTask(task, leader, 0);
@@ -239,9 +225,4 @@ public class NonQueryExecutor extends ClusterQPExecutor {
     }
     return task.getResponse().isSuccess();
   }
-
-  public void shutdown(){
-    cliClientService.shutdown();
-  }
-
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
new file mode 100644
index 0000000..a5cd02d
--- /dev/null
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.qp.executor;
+
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.option.CliOptions;
+import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
+import java.util.Set;
+import org.apache.iotdb.cluster.callback.SingleTask;
+import org.apache.iotdb.cluster.callback.Task;
+import org.apache.iotdb.cluster.callback.Task.TaskState;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
+import org.apache.iotdb.cluster.rpc.MetadataType;
+import org.apache.iotdb.cluster.rpc.NodeAsClient;
+import org.apache.iotdb.cluster.rpc.impl.RaftNodeAsClient;
+import org.apache.iotdb.cluster.rpc.request.QueryMetadataRequest;
+import org.apache.iotdb.cluster.rpc.response.QueryMetadataResponse;
+import org.apache.iotdb.cluster.utils.RaftUtils;
+
+/**
+ * Handle show all storage group logic
+ */
+public class QueryMetadataExecutor extends ClusterQPExecutor {
+
+  public QueryMetadataExecutor() {
+
+  }
+
+  public void init(){
+    this.cliClientService = new BoltCliClientService();
+    this.cliClientService.init(new CliOptions());
+    SUB_TASK_NUM = 1;
+  }
+
+  public Set<String> processMetadataQuery(MetadataType type)
+      throws RaftConnectionException, InterruptedException {
+    QueryMetadataRequest request = new QueryMetadataRequest(
+        ClusterConfig.METADATA_GROUP_ID, type);
+    PeerId leader = RaftUtils.getLeader(ClusterConfig.METADATA_GROUP_ID, 
cliClientService);
+
+    SingleTask task = new SingleTask(false, request);
+    return asyncHandleTask(task, leader, 0);
+  }
+
+  /**
+   * Async handle task by task and leader id.
+   *
+   * @param task request task
+   * @param leader leader of the target raft group
+   * @param taskRetryNum Number of task retries due to timeout and redirected.
+   * @return request result
+   */
+  private Set<String> asyncHandleTask(Task task, PeerId leader, int 
taskRetryNum)
+      throws RaftConnectionException, InterruptedException {
+    if (taskRetryNum >= TASK_MAX_RETRY) {
+      throw new RaftConnectionException(String.format("Task retries reach the 
upper bound %s",
+          TASK_MAX_RETRY));
+    }
+    NodeAsClient client = new RaftNodeAsClient();
+    /** Call async method **/
+    client.asyncHandleRequest(cliClientService, task.getRequest(), leader, 
task);
+    task.await();
+    if (task.getTaskState() != TaskState.FINISH) {
+      task.setTaskNum(SUB_TASK_NUM);
+      return asyncHandleTask(task, leader, taskRetryNum + 1);
+    }
+    return ((QueryMetadataResponse) task.getResponse()).getMetadataSet();
+  }
+}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/BasicResponse.java
 b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/MetadataType.java
similarity index 57%
copy from 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/BasicResponse.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/rpc/MetadataType.java
index 43ee88d..9205241 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/BasicResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/MetadataType.java
@@ -16,33 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.response;
+package org.apache.iotdb.cluster.rpc;
 
-import java.io.Serializable;
-
-public abstract class BasicResponse implements Serializable {
-
-  private boolean redirected;
-  private boolean success;
-
-  public BasicResponse(boolean redirected, boolean success) {
-    this.redirected = redirected;
-    this.success = success;
-  }
-
-  public boolean isRedirected() {
-    return redirected;
-  }
-
-  public void setRedirected(boolean redirected) {
-    this.redirected = redirected;
-  }
-
-  public boolean isSuccess() {
-    return success;
-  }
-
-  public void setSuccess(boolean success) {
-    this.success = success;
-  }
+public enum MetadataType {
+  STORAGE_GROUP, TIME_SERIES
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
new file mode 100644
index 0000000..dc7f877
--- /dev/null
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.processor;
+
+import com.alipay.remoting.AsyncContext;
+import com.alipay.remoting.BizContext;
+import com.alipay.remoting.exception.CodecException;
+import com.alipay.remoting.serialization.SerializerManager;
+import com.alipay.sofa.jraft.entity.Task;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
+import org.apache.iotdb.cluster.entity.raft.RaftService;
+import org.apache.iotdb.cluster.rpc.MetadataType;
+import org.apache.iotdb.cluster.rpc.request.QueryMetadataRequest;
+import org.apache.iotdb.cluster.rpc.response.QueryMetadataResponse;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QueryMetadataAsyncProcessor extends 
BasicAsyncUserProcessor<QueryMetadataRequest> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryMetadataAsyncProcessor.class);
+  private Server server;
+
+  public QueryMetadataAsyncProcessor(Server server) {
+    this.server = server;
+  }
+
+  @Override
+  public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
+      QueryMetadataRequest queryMetadataRequest) {
+    MetadataType metadataType = queryMetadataRequest.getMetadataType();
+    if (metadataType == MetadataType.STORAGE_GROUP) {
+      MetadataRaftHolder metadataHolder = (MetadataRaftHolder) 
server.getMetadataHolder();
+      /** Verify if it's the leader of metadata **/
+      if (metadataHolder.getFsm().isLeader()) {
+        try {
+          final Task task = new Task();
+          Set<String> storageGroupSet = 
metadataHolder.getFsm().getAllStorageGroups();
+          task.setDone(status -> {
+            if (!status.isOk()) {
+              asyncContext.sendResponse(new QueryMetadataResponse(false, 
false));
+            }
+            asyncContext.sendResponse(new QueryMetadataResponse(false, true, 
storageGroupSet));
+          });
+          task.setData(ByteBuffer
+              .wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
+                  .serialize(queryMetadataRequest)));
+          RaftService service = (RaftService) metadataHolder.getService();
+          service.getNode().apply(task);
+        } catch (final CodecException | PathErrorException e) {
+          asyncContext.sendResponse(new QueryMetadataResponse(false, false));
+        }
+      } else {
+        QueryMetadataResponse response = new QueryMetadataResponse(true, 
false);
+        asyncContext.sendResponse(response);
+      }
+    } else {
+      //TODO deal with query time series
+      QueryMetadataResponse response = new QueryMetadataResponse(false, false);
+      asyncContext.sendResponse(response);
+    }
+  }
+
+  @Override
+  public String interest() {
+    return QueryMetadataRequest.class.getName();
+  }
+}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/BasicResponse.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/QueryMetadataRequest.java
similarity index 60%
copy from 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/BasicResponse.java
copy to 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/QueryMetadataRequest.java
index 43ee88d..c1a6d03 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/BasicResponse.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/QueryMetadataRequest.java
@@ -16,33 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.rpc.response;
+package org.apache.iotdb.cluster.rpc.request;
 
 import java.io.Serializable;
+import org.apache.iotdb.cluster.rpc.MetadataType;
 
-public abstract class BasicResponse implements Serializable {
+public class QueryMetadataRequest extends BasicRequest implements Serializable 
{
 
-  private boolean redirected;
-  private boolean success;
+  private MetadataType metadataType;
 
-  public BasicResponse(boolean redirected, boolean success) {
-    this.redirected = redirected;
-    this.success = success;
+  public QueryMetadataRequest(String groupID, MetadataType metadataType) {
+    super(groupID);
+    this.metadataType = metadataType;
   }
 
-  public boolean isRedirected() {
-    return redirected;
+  public MetadataType getMetadataType() {
+    return metadataType;
   }
 
-  public void setRedirected(boolean redirected) {
-    this.redirected = redirected;
-  }
-
-  public boolean isSuccess() {
-    return success;
-  }
-
-  public void setSuccess(boolean success) {
-    this.success = success;
+  public void setMetadataType(
+      MetadataType metadataType) {
+    this.metadataType = metadataType;
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/BasicResponse.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/BasicResponse.java
index 43ee88d..aedb92c 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/BasicResponse.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/BasicResponse.java
@@ -25,7 +25,7 @@ public abstract class BasicResponse implements Serializable {
   private boolean redirected;
   private boolean success;
 
-  public BasicResponse(boolean redirected, boolean success) {
+  public  BasicResponse(boolean redirected, boolean success) {
     this.redirected = redirected;
     this.success = success;
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/BasicResponse.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryMetadataResponse.java
similarity index 62%
copy from 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/BasicResponse.java
copy to 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryMetadataResponse.java
index 43ee88d..d07c711 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/BasicResponse.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryMetadataResponse.java
@@ -18,31 +18,26 @@
  */
 package org.apache.iotdb.cluster.rpc.response;
 
-import java.io.Serializable;
+import java.util.Set;
 
-public abstract class BasicResponse implements Serializable {
+public class QueryMetadataResponse extends BasicResponse {
 
-  private boolean redirected;
-  private boolean success;
+  private Set<String> metadataSet;
 
-  public BasicResponse(boolean redirected, boolean success) {
-    this.redirected = redirected;
-    this.success = success;
+  public QueryMetadataResponse(boolean redirected, boolean success) {
+    super(redirected, success);
   }
 
-  public boolean isRedirected() {
-    return redirected;
+  public QueryMetadataResponse(boolean redirected, boolean success, 
Set<String> metadataSet) {
+    super(redirected, success);
+    this.metadataSet = metadataSet;
   }
 
-  public void setRedirected(boolean redirected) {
-    this.redirected = redirected;
+  public Set<String> getMetadataSet() {
+    return metadataSet;
   }
 
-  public boolean isSuccess() {
-    return success;
-  }
-
-  public void setSuccess(boolean success) {
-    this.success = success;
+  public void setMetadataSet(Set<String> metadataSet) {
+    this.metadataSet = metadataSet;
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
index da418d1..311fbcd 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
@@ -18,14 +18,22 @@
  */
 package org.apache.iotdb.cluster.rpc.service;
 
-import com.alipay.sofa.jraft.option.CliOptions;
-import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
 import java.io.IOException;
-import java.time.ZoneId;
+import java.util.Set;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
+import org.apache.iotdb.cluster.qp.executor.QueryMetadataExecutor;
+import org.apache.iotdb.cluster.rpc.MetadataType;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.service.TSServiceImpl;
+import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
+import org.apache.iotdb.service.rpc.thrift.TS_Status;
+import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +46,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TSServiceClusterImpl.class);
 
   private ThreadLocal<NonQueryExecutor> nonQueryExecutor = new ThreadLocal<>();
+  private ThreadLocal<QueryMetadataExecutor> queryMetadataExecutor = new 
ThreadLocal<>();
 
   public TSServiceClusterImpl() throws IOException {
     super();
@@ -47,6 +56,8 @@ public class TSServiceClusterImpl extends TSServiceImpl {
   public void initClusterService(){
     nonQueryExecutor.set(new NonQueryExecutor());
     nonQueryExecutor.get().init();
+    queryMetadataExecutor.set(new QueryMetadataExecutor());
+    queryMetadataExecutor.get().init();
   }
 
 //  //TODO
@@ -90,7 +101,52 @@ public class TSServiceClusterImpl extends TSServiceImpl {
   /**
    * Close cluster service
    */
+  @Override
   public void closeClusterService() {
     nonQueryExecutor.get().shutdown();
+    queryMetadataExecutor.get().shutdown();
+  }
+
+  @Override
+  public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) throws 
TException {
+    TS_Status status;
+    if (!checkLogin()) {
+      LOGGER.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
+      status = getErrorStatus(ERROR_NOT_LOGIN);
+      return new TSFetchMetadataResp(status);
+    }
+    TSFetchMetadataResp resp = new TSFetchMetadataResp();
+    switch (req.getType()) {
+      case "SHOW_STORAGE_GROUP":
+        try {
+          Set<String> storageGroups = 
processMetadataQuery(MetadataType.STORAGE_GROUP);
+          resp.setShowStorageGroups(storageGroups);
+        } catch (RaftConnectionException | InterruptedException e) {
+          status = getErrorStatus(String.format("Failed to fetch storage 
groups' metadata because: %s", e));
+          resp.setStatus(status);
+          return resp;
+        } catch (OutOfMemoryError outOfMemoryError) { // TODO OOME
+          LOGGER.error("Failed to fetch storage groups' metadata", 
outOfMemoryError);
+          status = getErrorStatus(
+              String.format("Failed to fetch storage groups' metadata because: 
%s",
+                  outOfMemoryError));
+          resp.setStatus(status);
+          return resp;
+        }
+        status = new TS_Status(TS_StatusCode.SUCCESS_STATUS);
+        break;
+      default:
+        status = new TS_Status(TS_StatusCode.ERROR_STATUS);
+        status
+            .setErrorMessage(String.format("Unsuport fetch metadata operation 
%s", req.getType()));
+        break;
+    }
+    resp.setStatus(status);
+    return resp;
+  }
+
+  public Set<String> processMetadataQuery(MetadataType type)
+      throws RaftConnectionException, InterruptedException {
+    return queryMetadataExecutor.get().processMetadataQuery(type);
   }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 4af6121..34941d5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -93,8 +93,8 @@ import org.slf4j.LoggerFactory;
 public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TSServiceImpl.class);
-  private static final String INFO_NOT_LOGIN = "{}: Not login.";
-  private static final String ERROR_NOT_LOGIN = "Not login";
+  protected static final String INFO_NOT_LOGIN = "{}: Not login.";
+  protected static final String ERROR_NOT_LOGIN = "Not login";
 
   private QueryProcessor processor = new QueryProcessor(new 
OverflowQPExecutor());
   // Record the username for every rpc connection. Username.get() is null if

Reply via email to