This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch cluster_metadata_query
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_metadata_query by this
push:
new 34aa70d update
34aa70d is described below
commit 34aa70d0ec518fc3e883e52bd5ee6fcaba9dcca5
Author: mdf369 <[email protected]>
AuthorDate: Fri Apr 12 15:36:16 2019 +0800
update
---
.../cluster/qp/executor/QueryMetadataExecutor.java | 319 +++------------------
.../QueryMetadataInStringAsyncProcessor.java | 5 +-
.../org/apache/iotdb/db/metadata/Metadata.java | 3 +-
3 files changed, 50 insertions(+), 277 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 2187bea..1dfbc7e 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -117,7 +117,7 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
}
return paths;
}
-
+
/**
* Handle query timeseries in one data group
*
@@ -130,17 +130,18 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
SingleQPTask task = new SingleQPTask(false, request);
LOGGER.debug("Execute show timeseries {} statement for group {}.",
pathList, groupId);
+ PeerId holder;
/** Check if the plan can be executed locally. **/
if (canHandleQueryByGroupId(groupId)) {
- LOGGER.debug("Execute show timeseries {} statement locally for group
{}.", pathList, groupId);
- res.addAll(queryTimeSeriesLocally(pathList, groupId, task));
+ LOGGER.debug("Execute show timeseries {} statement locally for group {}
by sending request to local node.", pathList, groupId);
+ holder = this.server.getServerId();
} else {
- try {
- PeerId holder = RaftUtils.getRandomPeerID(groupId);
- res.addAll(queryTimeSeries(task, holder));
- } catch (RaftConnectionException e) {
- throw new ProcessorException("Raft connection occurs error.", e);
- }
+ holder = RaftUtils.getRandomPeerID(groupId);
+ }
+ try {
+ res.addAll(queryTimeSeries(task, holder));
+ } catch (RaftConnectionException e) {
+ throw new ProcessorException("Raft connection occurs error.", e);
}
}
@@ -157,17 +158,18 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
taskList.add(task);
LOGGER.debug("Execute show metadata in string statement for group {}.",
groupId);
+ PeerId holder;
/** Check if the plan can be executed locally. **/
if (canHandleQueryByGroupId(groupId)) {
- LOGGER.debug("Execute show metadata in string statement locally for
group {}.", groupId);
- asyncQueryMetadataInStringLocally(groupId, task);
+ LOGGER.debug("Execute show metadata in string statement locally for
group {} by sending request to local node.", groupId);
+ holder = this.server.getServerId();
} else {
- try {
- PeerId holder = RaftUtils.getRandomPeerID(groupId);
- asyncSendNonQueryTask(task, holder, 0);
- } catch (RaftConnectionException e) {
- throw new ProcessorException("Raft connection occurs error.", e);
- }
+ holder = RaftUtils.getRandomPeerID(groupId);
+ }
+ try {
+ asyncSendNonQueryTask(task, holder, 0);
+ } catch (RaftConnectionException e) {
+ throw new ProcessorException("Raft connection occurs error.", e);
}
}
for (int i = 0; i < taskList.size(); i++) {
@@ -195,17 +197,18 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
taskList.add(task);
LOGGER.debug("Execute query metadata statement for group {}.", groupId);
+ PeerId holder;
/** Check if the plan can be executed locally. **/
if (canHandleQueryByGroupId(groupId)) {
- LOGGER.debug("Execute query metadata statement locally for group {}.",
groupId);
- asyncQueryMetadataLocally(groupId, task);
+ LOGGER.debug("Execute query metadata statement locally for group {} by
sending request to local node.", groupId);
+ holder = this.server.getServerId();
} else {
- try {
- PeerId holder = RaftUtils.getRandomPeerID(groupId);
- asyncSendNonQueryTask(task, holder, 0);
- } catch (RaftConnectionException e) {
- throw new ProcessorException("Raft connection occurs error.", e);
- }
+ holder = RaftUtils.getRandomPeerID(groupId);
+ }
+ try {
+ asyncSendNonQueryTask(task, holder, 0);
+ } catch (RaftConnectionException e) {
+ throw new ProcessorException("Raft connection occurs error.", e);
}
}
for (int i = 0; i < taskList.size(); i++) {
@@ -237,17 +240,18 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
SingleQPTask task = new SingleQPTask(false, request);
LOGGER.debug("Execute get series type for {} statement for group {}.",
path, groupId);
+ PeerId holder;
/** Check if the plan can be executed locally. **/
if (canHandleQueryByGroupId(groupId)) {
- LOGGER.debug("Execute get series type for {} statement locally for
group {}.", path, groupId);
- dataType = querySeriesTypeLocally(path, groupId, task);
+ LOGGER.debug("Execute get series type for {} statement locally for
group {} by sending request to local node.", path, groupId);
+ holder = this.server.getServerId();
} else {
- try {
- PeerId holder = RaftUtils.getRandomPeerID(groupId);
- dataType = querySeriesType(task, holder);
- } catch (RaftConnectionException e) {
- throw new ProcessorException("Raft connection occurs error.", e);
- }
+ holder = RaftUtils.getRandomPeerID(groupId);
+ }
+ try {
+ dataType = querySeriesType(task, holder);
+ } catch (RaftConnectionException e) {
+ throw new ProcessorException("Raft connection occurs error.", e);
}
}
return dataType;
@@ -285,74 +289,19 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
SingleQPTask task = new SingleQPTask(false, request);
LOGGER.debug("Execute get paths for {} statement for group {}.", pathList,
groupId);
+ PeerId holder;
/** Check if the plan can be executed locally. **/
if (canHandleQueryByGroupId(groupId)) {
- LOGGER.debug("Execute get paths for {} statement locally for group {}.",
pathList, groupId);
- res.addAll(queryPathsLocally(pathList, groupId, task));
+ LOGGER.debug("Execute get paths for {} statement locally for group {} by
sending request to local node.", pathList, groupId);
+ holder = this.server.getServerId();
} else {
- try {
- PeerId holder = RaftUtils.getRandomPeerID(groupId);
- res.addAll(queryPaths(task, holder));
- } catch (RaftConnectionException e) {
- throw new ProcessorException("Raft connection occurs error.", e);
- }
+ holder = RaftUtils.getRandomPeerID(groupId);
}
- }
-
- /**
- * Handle "show timeseries <path>" statement
- *
- * @param pathList column path
- */
- private List<List<String>> queryTimeSeriesLocally(List<String> pathList,
String groupId,
- SingleQPTask task)
- throws InterruptedException, ProcessorException {
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder =
RaftUtils.getDataPartitonRaftHolder(groupId);
-
- /** Check consistency level**/
- if (readMetadataConsistencyLevel ==
ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
- QueryTimeSeriesResponse response = QueryTimeSeriesResponse
- .createEmptyResponse(groupId);
- try {
- for (String path : pathList) {
- response.addTimeSeries(mManager.getShowTimeseriesPath(path));
- }
- } catch (final PathErrorException e) {
- response = QueryTimeSeriesResponse.createErrorResponse(groupId,
e.getMessage());
- }
- task.run(response);
- } else {
- ((RaftService) dataPartitionHolder.getService()).getNode()
- .readIndex(reqContext, new ReadIndexClosure() {
-
- @Override
- public void run(Status status, long index, byte[] reqCtx) {
- QueryTimeSeriesResponse response = QueryTimeSeriesResponse
- .createEmptyResponse(groupId);
- if (status.isOk()) {
- try {
- LOGGER.debug("start to read");
- for (String path : pathList) {
-
response.addTimeSeries(mManager.getShowTimeseriesPath(path));
- }
- } catch (final PathErrorException e) {
- response =
QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
- }
- } else {
- response = QueryTimeSeriesResponse
- .createErrorResponse(groupId, status.getErrorMsg());
- }
- task.run(response);
- }
- });
+ try {
+ res.addAll(queryPaths(task, holder));
+ } catch (RaftConnectionException e) {
+ throw new ProcessorException("Raft connection occurs error.", e);
}
- task.await();
- QueryTimeSeriesResponse response = (QueryTimeSeriesResponse)
task.getResponse();
- if (response == null || !response.isSuccess()) {
- throw new ProcessorException("Execute show timeseries " + pathList + "
statement false.");
- }
- return response.getTimeSeries();
}
private List<List<String>> queryTimeSeries(SingleQPTask task, PeerId leader)
@@ -362,50 +311,6 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
: ((QueryTimeSeriesResponse) response).getTimeSeries();
}
- private TSDataType querySeriesTypeLocally(String path, String groupId,
- SingleQPTask task)
- throws InterruptedException, ProcessorException {
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder =
RaftUtils.getDataPartitonRaftHolder(groupId);
-
- /** Check consistency level**/
- if (readMetadataConsistencyLevel ==
ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
- QuerySeriesTypeResponse response;
- try {
- response = QuerySeriesTypeResponse.createSuccessResponse(groupId,
mManager.getSeriesType(path));
- } catch (final PathErrorException e) {
- response = QuerySeriesTypeResponse.createErrorResponse(groupId,
e.getMessage());
- }
- task.run(response);
- } else {
- ((RaftService) dataPartitionHolder.getService()).getNode()
- .readIndex(reqContext, new ReadIndexClosure() {
-
- @Override
- public void run(Status status, long index, byte[] reqCtx) {
- QuerySeriesTypeResponse response;
- if (status.isOk()) {
- try {
- LOGGER.debug("start to read");
- response =
QuerySeriesTypeResponse.createSuccessResponse(groupId,
mManager.getSeriesType(path));
- } catch (final PathErrorException e) {
- response =
QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
- }
- } else {
- response =
QuerySeriesTypeResponse.createErrorResponse(groupId, status.getErrorMsg());
- }
- task.run(response);
- }
- });
- }
- task.await();
- QuerySeriesTypeResponse response = (QuerySeriesTypeResponse)
task.getResponse();
- if (response == null || !response.isSuccess()) {
- throw new ProcessorException("Execute get series type for " + path + "
statement false.");
- }
- return response.getDataType();
- }
-
private TSDataType querySeriesType(SingleQPTask task, PeerId leader)
throws InterruptedException, RaftConnectionException {
BasicResponse response = asyncHandleNonQueryTaskGetRes(task, leader, 0);
@@ -458,140 +363,6 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
return ((QueryStorageGroupResponse) task.getResponse()).getStorageGroups();
}
- /**
- * Handle "show timeseries" statement
- */
- private void asyncQueryMetadataInStringLocally(String groupId, SingleQPTask
task) {
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder)
server
- .getDataPartitionHolder(groupId);
- if (readMetadataConsistencyLevel ==
ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
- QueryMetadataInStringResponse response = QueryMetadataInStringResponse
- .createSuccessResponse(groupId, mManager.getMetadataInString());
- response.addResult(true);
- task.run(response);
- } else {
- ((RaftService) dataPartitionHolder.getService()).getNode()
- .readIndex(reqContext, new ReadIndexClosure() {
-
- @Override
- public void run(Status status, long index, byte[] reqCtx) {
- QueryMetadataInStringResponse response;
- if (status.isOk()) {
- LOGGER.debug("start to read");
- response = QueryMetadataInStringResponse
- .createSuccessResponse(groupId,
mManager.getMetadataInString());
- response.addResult(true);
- } else {
- response = QueryMetadataInStringResponse
- .createErrorResponse(groupId, status.getErrorMsg());
- response.addResult(false);
- }
- task.run(response);
- }
- });
- }
- }
-
- /**
- * Handle "show timeseries" statement
- */
- private void asyncQueryMetadataLocally(String groupId, SingleQPTask task)
- throws PathErrorException {
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder)
server
- .getDataPartitionHolder(groupId);
- if (readMetadataConsistencyLevel ==
ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
- QueryMetadataResponse response = QueryMetadataResponse
- .createSuccessResponse(groupId, mManager.getMetadata());
- response.addResult(true);
- task.run(response);
- } else {
- ((RaftService) dataPartitionHolder.getService()).getNode()
- .readIndex(reqContext, new ReadIndexClosure() {
-
- @Override
- public void run(Status status, long index, byte[] reqCtx) {
- QueryMetadataResponse response;
- if (status.isOk()) {
- LOGGER.debug("start to read");
- try {
- response = QueryMetadataResponse
- .createSuccessResponse(groupId, mManager.getMetadata());
- response.addResult(true);
- } catch (PathErrorException e) {
- response = QueryMetadataResponse
- .createErrorResponse(groupId, e.getMessage());
- response.addResult(false);
- }
- } else {
- response = QueryMetadataResponse
- .createErrorResponse(groupId, status.getErrorMsg());
- response.addResult(false);
- }
- task.run(response);
- }
- });
- }
- }
-
- /**
- * Handle "show timeseries <path>" statement
- *
- * @param pathList column path
- */
- private List<String> queryPathsLocally(List<String> pathList, String groupId,
- SingleQPTask task)
- throws InterruptedException, ProcessorException {
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder =
RaftUtils.getDataPartitonRaftHolder(groupId);
-
- /** Check consistency level**/
- if (readMetadataConsistencyLevel ==
ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
- QueryPathsResponse response = QueryPathsResponse
- .createEmptyResponse(groupId);
- try {
- for (String path : pathList) {
- response.addPaths(mManager.getPaths(path));
- }
- } catch (final PathErrorException e) {
- response = QueryPathsResponse.createErrorResponse(groupId,
e.getMessage());
- }
- task.run(response);
- } else {
- ((RaftService) dataPartitionHolder.getService()).getNode()
- .readIndex(reqContext, new ReadIndexClosure() {
-
- @Override
- public void run(Status status, long index, byte[] reqCtx) {
- QueryPathsResponse response = QueryPathsResponse
- .createEmptyResponse(groupId);
- if (status.isOk()) {
- try {
- LOGGER.debug("start to read");
- for (String path : pathList) {
- response.addPaths(mManager.getPaths(path));
- }
- } catch (final PathErrorException e) {
- response = QueryPathsResponse.createErrorResponse(groupId,
e.getMessage());
- }
- } else {
- response = QueryPathsResponse
- .createErrorResponse(groupId, status.getErrorMsg());
- }
- task.run(response);
- }
- });
- }
- task.await();
- QueryPathsResponse response = (QueryPathsResponse) task.getResponse();
- if (response == null || !response.isSuccess()) {
- LOGGER.error("Execute get paths for {} statement false.", pathList);
- throw new ProcessorException();
- }
- return response.getPaths();
- }
-
private List<String> queryPaths(SingleQPTask task, PeerId leader)
throws InterruptedException, RaftConnectionException {
BasicResponse response = asyncHandleNonQueryTaskGetRes(task, leader, 0);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
index 5c81756..b80f4ae 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
@@ -39,8 +39,6 @@ public class QueryMetadataInStringAsyncProcessor extends
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
QueryMetadataInStringRequest request) {
String groupId = request.getGroupID();
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder =
RaftUtils.getDataPartitonRaftHolder(groupId);
if (request.getReadConsistencyLevel() ==
ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
QueryMetadataInStringResponse response = QueryMetadataInStringResponse
@@ -48,6 +46,9 @@ public class QueryMetadataInStringAsyncProcessor extends
response.addResult(true);
asyncContext.sendResponse(response);
} else {
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionHolder =
RaftUtils.getDataPartitonRaftHolder(groupId);
+
((RaftService) dataPartitionHolder.getService()).getNode()
.readIndex(reqContext, new ReadIndexClosure() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
index c83a3dc..5c56fe1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.metadata;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -31,7 +32,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
/**
* This class stores all the metadata info for every deviceId and every
timeseries.
*/
-public class Metadata {
+public class Metadata implements Serializable {
private Map<String, List<MeasurementSchema>> seriesMap;
private Map<String, List<String>> deviceIdMap;