This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch cluster_metadata_query
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_metadata_query by this 
push:
     new 34aa70d  update
34aa70d is described below

commit 34aa70d0ec518fc3e883e52bd5ee6fcaba9dcca5
Author: mdf369 <[email protected]>
AuthorDate: Fri Apr 12 15:36:16 2019 +0800

    update
---
 .../cluster/qp/executor/QueryMetadataExecutor.java | 319 +++------------------
 .../QueryMetadataInStringAsyncProcessor.java       |   5 +-
 .../org/apache/iotdb/db/metadata/Metadata.java     |   3 +-
 3 files changed, 50 insertions(+), 277 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 2187bea..1dfbc7e 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -117,7 +117,7 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
     }
     return paths;
   }
-
+  
   /**
    * Handle query timeseries in one data group
    *
@@ -130,17 +130,18 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
     SingleQPTask task = new SingleQPTask(false, request);
 
     LOGGER.debug("Execute show timeseries {} statement for group {}.", 
pathList, groupId);
+    PeerId holder;
     /** Check if the plan can be executed locally. **/
     if (canHandleQueryByGroupId(groupId)) {
-      LOGGER.debug("Execute show timeseries {} statement locally for group 
{}.", pathList, groupId);
-      res.addAll(queryTimeSeriesLocally(pathList, groupId, task));
+      LOGGER.debug("Execute show timeseries {} statement locally for group {} 
by sending request to local node.", pathList, groupId);
+      holder = this.server.getServerId();
     } else {
-      try {
-        PeerId holder = RaftUtils.getRandomPeerID(groupId);
-        res.addAll(queryTimeSeries(task, holder));
-      } catch (RaftConnectionException e) {
-        throw new ProcessorException("Raft connection occurs error.", e);
-      }
+      holder = RaftUtils.getRandomPeerID(groupId);
+    }
+    try {
+      res.addAll(queryTimeSeries(task, holder));
+    } catch (RaftConnectionException e) {
+      throw new ProcessorException("Raft connection occurs error.", e);
     }
   }
 
@@ -157,17 +158,18 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
       taskList.add(task);
 
       LOGGER.debug("Execute show metadata in string statement for group {}.", 
groupId);
+      PeerId holder;
       /** Check if the plan can be executed locally. **/
       if (canHandleQueryByGroupId(groupId)) {
-        LOGGER.debug("Execute show metadata in string statement locally for 
group {}.", groupId);
-        asyncQueryMetadataInStringLocally(groupId, task);
+        LOGGER.debug("Execute show metadata in string statement locally for 
group {} by sending request to local node.", groupId);
+        holder = this.server.getServerId();
       } else {
-        try {
-          PeerId holder = RaftUtils.getRandomPeerID(groupId);
-          asyncSendNonQueryTask(task, holder, 0);
-        } catch (RaftConnectionException e) {
-          throw new ProcessorException("Raft connection occurs error.", e);
-        }
+        holder = RaftUtils.getRandomPeerID(groupId);
+      }
+      try {
+        asyncSendNonQueryTask(task, holder, 0);
+      } catch (RaftConnectionException e) {
+        throw new ProcessorException("Raft connection occurs error.", e);
       }
     }
     for (int i = 0; i < taskList.size(); i++) {
@@ -195,17 +197,18 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
       taskList.add(task);
 
       LOGGER.debug("Execute query metadata statement for group {}.", groupId);
+      PeerId holder;
       /** Check if the plan can be executed locally. **/
       if (canHandleQueryByGroupId(groupId)) {
-        LOGGER.debug("Execute query metadata statement locally for group {}.", 
groupId);
-        asyncQueryMetadataLocally(groupId, task);
+        LOGGER.debug("Execute query metadata statement locally for group {} by 
sending request to local node.", groupId);
+        holder = this.server.getServerId();
       } else {
-        try {
-          PeerId holder = RaftUtils.getRandomPeerID(groupId);
-          asyncSendNonQueryTask(task, holder, 0);
-        } catch (RaftConnectionException e) {
-          throw new ProcessorException("Raft connection occurs error.", e);
-        }
+        holder = RaftUtils.getRandomPeerID(groupId);
+      }
+      try {
+        asyncSendNonQueryTask(task, holder, 0);
+      } catch (RaftConnectionException e) {
+        throw new ProcessorException("Raft connection occurs error.", e);
       }
     }
     for (int i = 0; i < taskList.size(); i++) {
@@ -237,17 +240,18 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
       SingleQPTask task = new SingleQPTask(false, request);
 
       LOGGER.debug("Execute get series type for {} statement for group {}.", 
path, groupId);
+      PeerId holder;
       /** Check if the plan can be executed locally. **/
       if (canHandleQueryByGroupId(groupId)) {
-        LOGGER.debug("Execute get series type for {} statement locally for 
group {}.", path, groupId);
-        dataType = querySeriesTypeLocally(path, groupId, task);
+        LOGGER.debug("Execute get series type for {} statement locally for 
group {} by sending request to local node.", path, groupId);
+        holder = this.server.getServerId();
       } else {
-        try {
-          PeerId holder = RaftUtils.getRandomPeerID(groupId);
-          dataType = querySeriesType(task, holder);
-        } catch (RaftConnectionException e) {
-          throw new ProcessorException("Raft connection occurs error.", e);
-        }
+        holder = RaftUtils.getRandomPeerID(groupId);
+      }
+      try {
+        dataType = querySeriesType(task, holder);
+      } catch (RaftConnectionException e) {
+        throw new ProcessorException("Raft connection occurs error.", e);
       }
     }
     return dataType;
@@ -285,74 +289,19 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
     SingleQPTask task = new SingleQPTask(false, request);
 
     LOGGER.debug("Execute get paths for {} statement for group {}.", pathList, 
groupId);
+    PeerId holder;
     /** Check if the plan can be executed locally. **/
     if (canHandleQueryByGroupId(groupId)) {
-      LOGGER.debug("Execute get paths for {} statement locally for group {}.", 
pathList, groupId);
-      res.addAll(queryPathsLocally(pathList, groupId, task));
+      LOGGER.debug("Execute get paths for {} statement locally for group {} by 
sending request to local node.", pathList, groupId);
+      holder = this.server.getServerId();
     } else {
-      try {
-        PeerId holder = RaftUtils.getRandomPeerID(groupId);
-        res.addAll(queryPaths(task, holder));
-      } catch (RaftConnectionException e) {
-        throw new ProcessorException("Raft connection occurs error.", e);
-      }
+      holder = RaftUtils.getRandomPeerID(groupId);
     }
-  }
-
-  /**
-   * Handle "show timeseries <path>" statement
-   *
-   * @param pathList column path
-   */
-  private List<List<String>> queryTimeSeriesLocally(List<String> pathList, 
String groupId,
-      SingleQPTask task)
-      throws InterruptedException, ProcessorException {
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = 
RaftUtils.getDataPartitonRaftHolder(groupId);
-
-    /** Check consistency level**/
-    if (readMetadataConsistencyLevel == 
ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
-      QueryTimeSeriesResponse response = QueryTimeSeriesResponse
-          .createEmptyResponse(groupId);
-      try {
-        for (String path : pathList) {
-          response.addTimeSeries(mManager.getShowTimeseriesPath(path));
-        }
-      } catch (final PathErrorException e) {
-        response = QueryTimeSeriesResponse.createErrorResponse(groupId, 
e.getMessage());
-      }
-      task.run(response);
-    } else {
-      ((RaftService) dataPartitionHolder.getService()).getNode()
-          .readIndex(reqContext, new ReadIndexClosure() {
-
-            @Override
-            public void run(Status status, long index, byte[] reqCtx) {
-              QueryTimeSeriesResponse response = QueryTimeSeriesResponse
-                  .createEmptyResponse(groupId);
-              if (status.isOk()) {
-                try {
-                  LOGGER.debug("start to read");
-                  for (String path : pathList) {
-                    
response.addTimeSeries(mManager.getShowTimeseriesPath(path));
-                  }
-                } catch (final PathErrorException e) {
-                  response = 
QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
-                }
-              } else {
-                response = QueryTimeSeriesResponse
-                    .createErrorResponse(groupId, status.getErrorMsg());
-              }
-              task.run(response);
-            }
-          });
+    try {
+      res.addAll(queryPaths(task, holder));
+    } catch (RaftConnectionException e) {
+      throw new ProcessorException("Raft connection occurs error.", e);
     }
-    task.await();
-    QueryTimeSeriesResponse response = (QueryTimeSeriesResponse) 
task.getResponse();
-    if (response == null || !response.isSuccess()) {
-      throw new ProcessorException("Execute show timeseries " + pathList + " 
statement false.");
-    }
-    return response.getTimeSeries();
   }
 
   private List<List<String>> queryTimeSeries(SingleQPTask task, PeerId leader)
@@ -362,50 +311,6 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
         : ((QueryTimeSeriesResponse) response).getTimeSeries();
   }
 
-  private TSDataType querySeriesTypeLocally(String path, String groupId,
-      SingleQPTask task)
-      throws InterruptedException, ProcessorException {
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = 
RaftUtils.getDataPartitonRaftHolder(groupId);
-
-    /** Check consistency level**/
-    if (readMetadataConsistencyLevel == 
ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
-      QuerySeriesTypeResponse response;
-      try {
-        response = QuerySeriesTypeResponse.createSuccessResponse(groupId, 
mManager.getSeriesType(path));
-      } catch (final PathErrorException e) {
-        response = QuerySeriesTypeResponse.createErrorResponse(groupId, 
e.getMessage());
-      }
-      task.run(response);
-    } else {
-      ((RaftService) dataPartitionHolder.getService()).getNode()
-          .readIndex(reqContext, new ReadIndexClosure() {
-
-            @Override
-            public void run(Status status, long index, byte[] reqCtx) {
-              QuerySeriesTypeResponse response;
-              if (status.isOk()) {
-                try {
-                  LOGGER.debug("start to read");
-                  response = 
QuerySeriesTypeResponse.createSuccessResponse(groupId, 
mManager.getSeriesType(path));
-                } catch (final PathErrorException e) {
-                  response = 
QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
-                }
-              } else {
-                response = 
QuerySeriesTypeResponse.createErrorResponse(groupId, status.getErrorMsg());
-              }
-              task.run(response);
-            }
-          });
-    }
-    task.await();
-    QuerySeriesTypeResponse response = (QuerySeriesTypeResponse) 
task.getResponse();
-    if (response == null || !response.isSuccess()) {
-      throw new ProcessorException("Execute get series type for " + path + " 
statement false.");
-    }
-    return response.getDataType();
-  }
-
   private TSDataType querySeriesType(SingleQPTask task, PeerId leader)
       throws InterruptedException, RaftConnectionException {
     BasicResponse response = asyncHandleNonQueryTaskGetRes(task, leader, 0);
@@ -458,140 +363,6 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
     return ((QueryStorageGroupResponse) task.getResponse()).getStorageGroups();
   }
 
-  /**
-   * Handle "show timeseries" statement
-   */
-  private void asyncQueryMetadataInStringLocally(String groupId, SingleQPTask 
task) {
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder) 
server
-        .getDataPartitionHolder(groupId);
-    if (readMetadataConsistencyLevel == 
ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
-      QueryMetadataInStringResponse response = QueryMetadataInStringResponse
-          .createSuccessResponse(groupId, mManager.getMetadataInString());
-      response.addResult(true);
-      task.run(response);
-    } else {
-      ((RaftService) dataPartitionHolder.getService()).getNode()
-          .readIndex(reqContext, new ReadIndexClosure() {
-
-            @Override
-            public void run(Status status, long index, byte[] reqCtx) {
-              QueryMetadataInStringResponse response;
-              if (status.isOk()) {
-                LOGGER.debug("start to read");
-                response = QueryMetadataInStringResponse
-                    .createSuccessResponse(groupId, 
mManager.getMetadataInString());
-                response.addResult(true);
-              } else {
-                response = QueryMetadataInStringResponse
-                    .createErrorResponse(groupId, status.getErrorMsg());
-                response.addResult(false);
-              }
-              task.run(response);
-            }
-          });
-    }
-  }
-
-  /**
-   * Handle "show timeseries" statement
-   */
-  private void asyncQueryMetadataLocally(String groupId, SingleQPTask task)
-      throws PathErrorException {
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder) 
server
-        .getDataPartitionHolder(groupId);
-    if (readMetadataConsistencyLevel == 
ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
-      QueryMetadataResponse response = QueryMetadataResponse
-          .createSuccessResponse(groupId, mManager.getMetadata());
-      response.addResult(true);
-      task.run(response);
-    } else {
-      ((RaftService) dataPartitionHolder.getService()).getNode()
-          .readIndex(reqContext, new ReadIndexClosure() {
-
-            @Override
-            public void run(Status status, long index, byte[] reqCtx) {
-              QueryMetadataResponse response;
-              if (status.isOk()) {
-                LOGGER.debug("start to read");
-                try {
-                  response = QueryMetadataResponse
-                      .createSuccessResponse(groupId, mManager.getMetadata());
-                  response.addResult(true);
-                } catch (PathErrorException e) {
-                  response = QueryMetadataResponse
-                      .createErrorResponse(groupId, e.getMessage());
-                  response.addResult(false);
-                }
-              } else {
-                response = QueryMetadataResponse
-                    .createErrorResponse(groupId, status.getErrorMsg());
-                response.addResult(false);
-              }
-              task.run(response);
-            }
-          });
-    }
-  }
-
-  /**
-   * Handle "show timeseries <path>" statement
-   *
-   * @param pathList column path
-   */
-  private List<String> queryPathsLocally(List<String> pathList, String groupId,
-      SingleQPTask task)
-      throws InterruptedException, ProcessorException {
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = 
RaftUtils.getDataPartitonRaftHolder(groupId);
-
-    /** Check consistency level**/
-    if (readMetadataConsistencyLevel == 
ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
-      QueryPathsResponse response = QueryPathsResponse
-          .createEmptyResponse(groupId);
-      try {
-        for (String path : pathList) {
-          response.addPaths(mManager.getPaths(path));
-        }
-      } catch (final PathErrorException e) {
-        response = QueryPathsResponse.createErrorResponse(groupId, 
e.getMessage());
-      }
-      task.run(response);
-    } else {
-      ((RaftService) dataPartitionHolder.getService()).getNode()
-          .readIndex(reqContext, new ReadIndexClosure() {
-
-            @Override
-            public void run(Status status, long index, byte[] reqCtx) {
-              QueryPathsResponse response = QueryPathsResponse
-                  .createEmptyResponse(groupId);
-              if (status.isOk()) {
-                try {
-                  LOGGER.debug("start to read");
-                  for (String path : pathList) {
-                    response.addPaths(mManager.getPaths(path));
-                  }
-                } catch (final PathErrorException e) {
-                  response = QueryPathsResponse.createErrorResponse(groupId, 
e.getMessage());
-                }
-              } else {
-                response = QueryPathsResponse
-                    .createErrorResponse(groupId, status.getErrorMsg());
-              }
-              task.run(response);
-            }
-          });
-    }
-    task.await();
-    QueryPathsResponse response = (QueryPathsResponse) task.getResponse();
-    if (response == null || !response.isSuccess()) {
-      LOGGER.error("Execute get paths for {} statement false.", pathList);
-      throw new ProcessorException();
-    }
-    return response.getPaths();
-  }
-
   private List<String> queryPaths(SingleQPTask task, PeerId leader)
       throws InterruptedException, RaftConnectionException {
     BasicResponse response = asyncHandleNonQueryTaskGetRes(task, leader, 0);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
index 5c81756..b80f4ae 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
@@ -39,8 +39,6 @@ public class QueryMetadataInStringAsyncProcessor extends
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
       QueryMetadataInStringRequest request) {
     String groupId = request.getGroupID();
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = 
RaftUtils.getDataPartitonRaftHolder(groupId);
 
     if (request.getReadConsistencyLevel() == 
ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
       QueryMetadataInStringResponse response = QueryMetadataInStringResponse
@@ -48,6 +46,9 @@ public class QueryMetadataInStringAsyncProcessor extends
       response.addResult(true);
       asyncContext.sendResponse(response);
     } else {
+      final byte[] reqContext = RaftUtils.createRaftRequestContext();
+      DataPartitionRaftHolder dataPartitionHolder = 
RaftUtils.getDataPartitonRaftHolder(groupId);
+
       ((RaftService) dataPartitionHolder.getService()).getNode()
           .readIndex(reqContext, new ReadIndexClosure() {
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java 
b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
index c83a3dc..5c56fe1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.metadata;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -31,7 +32,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 /**
  * This class stores all the metadata info for every deviceId and every 
timeseries.
  */
-public class Metadata {
+public class Metadata implements Serializable {
 
   private Map<String, List<MeasurementSchema>> seriesMap;
   private Map<String, List<String>> deviceIdMap;

Reply via email to