This is an automated email from the ASF dual-hosted git repository. east pushed a commit to branch cluster_metadata_query in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 8836850439791cbefc3b41483613d6fa8b552534 Author: mdf369 <[email protected]> AuthorDate: Thu Apr 11 10:52:51 2019 +0800 implement DELTA_OBEJECT, COLUMN, ALL_COLUMNS --- .../org/apache/iotdb/cluster/entity/Server.java | 6 + .../cluster/qp/executor/QueryMetadataExecutor.java | 283 +++++++++++++++++++++ .../processor/QueryMetadataAsyncProcessor.java | 86 +++++++ .../raft/processor/QueryPathsAsyncProcessor.java | 92 +++++++ .../processor/QuerySeriesTypeAsyncProcessor.java | 80 ++++++ .../rpc/raft/request/QueryMetadataRequest.java | 28 ++ .../rpc/raft/request/QueryPathsRequest.java | 36 +++ .../rpc/raft/request/QuerySeriesTypeRequest.java | 35 +++ .../rpc/raft/response/QueryMetadataResponse.java | 47 ++++ .../rpc/raft/response/QueryPathsResponse.java | 50 ++++ .../rpc/raft/response/QuerySeriesTypeResponse.java | 50 ++++ .../cluster/rpc/service/TSServiceClusterImpl.java | 20 ++ .../org/apache/iotdb/db/metadata/Metadata.java | 54 ++++ .../org/apache/iotdb/db/service/TSServiceImpl.java | 25 +- 14 files changed, 886 insertions(+), 6 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java index be4a74a..ad1b4d6 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java @@ -32,7 +32,10 @@ import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder; import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager; import org.apache.iotdb.cluster.rpc.raft.processor.DataGroupNonQueryAsyncProcessor; import org.apache.iotdb.cluster.rpc.raft.processor.MetaGroupNonQueryAsyncProcessor; +import org.apache.iotdb.cluster.rpc.raft.processor.QueryMetadataAsyncProcessor; import org.apache.iotdb.cluster.rpc.raft.processor.QueryMetadataInStringAsyncProcessor; +import org.apache.iotdb.cluster.rpc.raft.processor.QueryPathsAsyncProcessor; +import org.apache.iotdb.cluster.rpc.raft.processor.QuerySeriesTypeAsyncProcessor; import org.apache.iotdb.cluster.rpc.raft.processor.QueryTimeSeriesAsyncProcessor; import org.apache.iotdb.cluster.utils.RaftUtils; import org.apache.iotdb.cluster.utils.hash.PhysicalNode; @@ -93,6 +96,9 @@ public class Server { rpcServer.registerUserProcessor(new MetaGroupNonQueryAsyncProcessor()); rpcServer.registerUserProcessor(new QueryTimeSeriesAsyncProcessor()); rpcServer.registerUserProcessor(new QueryMetadataInStringAsyncProcessor()); + rpcServer.registerUserProcessor(new QueryMetadataAsyncProcessor()); + rpcServer.registerUserProcessor(new QuerySeriesTypeAsyncProcessor()); + rpcServer.registerUserProcessor(new QueryPathsAsyncProcessor()); metadataHolder = new MetadataRaftHolder(peerIds, serverId, rpcServer, true); metadataHolder.init(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java index 8dc20b3..3be4e81 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java @@ -35,16 +35,24 @@ import org.apache.iotdb.cluster.entity.raft.RaftService; import org.apache.iotdb.cluster.exception.RaftConnectionException; import org.apache.iotdb.cluster.qp.ClusterQPExecutor; import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataInStringRequest; +import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataRequest; +import org.apache.iotdb.cluster.rpc.raft.request.QueryPathsRequest; +import org.apache.iotdb.cluster.rpc.raft.request.QuerySeriesTypeRequest; import org.apache.iotdb.cluster.rpc.raft.request.QueryStorageGroupRequest; import org.apache.iotdb.cluster.rpc.raft.request.QueryTimeSeriesRequest; import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse; import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataInStringResponse; +import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataResponse; +import org.apache.iotdb.cluster.rpc.raft.response.QueryPathsResponse; +import org.apache.iotdb.cluster.rpc.raft.response.QuerySeriesTypeResponse; import org.apache.iotdb.cluster.rpc.raft.response.QueryStorageGroupResponse; import org.apache.iotdb.cluster.rpc.raft.response.QueryTimeSeriesResponse; import org.apache.iotdb.cluster.utils.RaftUtils; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.metadata.Metadata; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -177,6 +185,123 @@ public class QueryMetadataExecutor extends ClusterQPExecutor { return combineMetadataInStringList(metadataList); } + public Metadata processMetadataQuery() + throws InterruptedException, ProcessorException, PathErrorException { + Set<String> groupIdSet = router.getAllGroupId(); + + Metadata[] metadatas = new Metadata[groupIdSet.size()]; + List<SingleQPTask> taskList = new ArrayList<>(); + for (String groupId : groupIdSet) { + QueryMetadataRequest request = new QueryMetadataRequest(groupId, + readMetadataConsistencyLevel); + SingleQPTask task = new SingleQPTask(false, request); + taskList.add(task); + + LOGGER.debug("Execute query metadata statement for group {}.", groupId); + /** Check if the plan can be executed locally. **/ + if (canHandleQueryByGroupId(groupId)) { + LOGGER.debug("Execute query metadata statement locally for group {}.", groupId); + asyncQueryMetadataLocally(groupId, task); + } else { + try { + PeerId holder = RaftUtils.getRandomPeerID(groupId); + asyncSendNonQueryTask(task, holder, 0); + } catch (RaftConnectionException e) { + LOGGER.error(e.getMessage()); + throw new ProcessorException("Raft connection occurs error.", e); + } + } + } + for (int i = 0; i < taskList.size(); i++) { + SingleQPTask task = taskList.get(i); + task.await(); + BasicResponse response = task.getResponse(); + if (response == null || !response.isSuccess()) { + LOGGER.error("Execute show timeseries statement false."); + throw new ProcessorException(); + } + metadatas[i] = ((QueryMetadataResponse)response).getMetadata(); + } + return Metadata.combineMetadatas(metadatas); + } + + public TSDataType processSeriesTypeQuery(String path) + throws InterruptedException, ProcessorException, PathErrorException { + TSDataType dataType = null; + List<String> storageGroupList = mManager.getAllFileNamesByPath(path); + if (storageGroupList.size() != 1) { + throw new PathErrorException("path " + path + " is not valid."); + } else { + String groupId = getGroupIdBySG(storageGroupList.get(0)); + QuerySeriesTypeRequest request = new QuerySeriesTypeRequest(groupId, + readMetadataConsistencyLevel, path); + SingleQPTask task = new SingleQPTask(false, request); + + LOGGER.debug("Execute get series type for {} statement for group {}.", path, groupId); + /** Check if the plan can be executed locally. **/ + if (canHandleQueryByGroupId(groupId)) { + LOGGER.debug("Execute get series type for {} statement locally for group {}.", path, groupId); + dataType = querySeriesTypeLocally(path, groupId, task); + } else { + try { + PeerId holder = RaftUtils.getRandomPeerID(groupId); + dataType = querySeriesType(task, holder); + } catch (RaftConnectionException e) { + LOGGER.error(e.getMessage()); + throw new ProcessorException("Raft connection occurs error.", e); + } + } + } + return dataType; + } + + /** + * Handle show timeseries <path> statement + */ + public List<String> processPathsQuery(String path) + throws InterruptedException, PathErrorException, ProcessorException { + List<String> res = new ArrayList<>(); + List<String> storageGroupList = mManager.getAllFileNamesByPath(path); + if (storageGroupList.isEmpty()) { + return new ArrayList<>(); + } else { + Map<String, Set<String>> groupIdSGMap = classifySGByGroupId(storageGroupList); + for (Entry<String, Set<String>> entry : groupIdSGMap.entrySet()) { + List<String> paths = getSubQueryPaths(entry.getValue(), path); + String groupId = entry.getKey(); + handlePathsQuery(groupId, paths, res); + } + } + return res; + } + + /** + * Handle query timeseries in one data group + * + * @param groupId data group id + */ + private void handlePathsQuery(String groupId, List<String> pathList, List<String> res) + throws ProcessorException, InterruptedException { + QueryPathsRequest request = new QueryPathsRequest(groupId, + readMetadataConsistencyLevel, pathList); + SingleQPTask task = new SingleQPTask(false, request); + + LOGGER.debug("Execute get paths for {} statement for group {}.", pathList, groupId); + /** Check if the plan can be executed locally. **/ + if (canHandleQueryByGroupId(groupId)) { + LOGGER.debug("Execute get paths for {} statement locally for group {}.", pathList, groupId); + res.addAll(queryPathsLocally(pathList, groupId, task)); + } else { + try { + PeerId holder = RaftUtils.getRandomPeerID(groupId); + res.addAll(queryPaths(task, holder)); + } catch (RaftConnectionException e) { + LOGGER.error(e.getMessage()); + throw new ProcessorException("Raft connection occurs error.", e); + } + } + } + /** * Handle "show timeseries <path>" statement * @@ -241,6 +366,58 @@ public class QueryMetadataExecutor extends ClusterQPExecutor { : ((QueryTimeSeriesResponse) response).getTimeSeries(); } + private TSDataType querySeriesTypeLocally(String path, String groupId, + SingleQPTask task) + throws InterruptedException, ProcessorException { + final byte[] reqContext = RaftUtils.createRaftRequestContext(); + DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId); + + /** Check consistency level**/ + if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) { + QuerySeriesTypeResponse response; + try { + response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(path)); + } catch (final PathErrorException e) { + response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage()); + } + task.run(response); + } else { + ((RaftService) dataPartitionHolder.getService()).getNode() + .readIndex(reqContext, new ReadIndexClosure() { + + @Override + public void run(Status status, long index, byte[] reqCtx) { + QuerySeriesTypeResponse response; + if (status.isOk()) { + try { + LOGGER.debug("start to read"); + response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(path)); + } catch (final PathErrorException e) { + response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage()); + } + } else { + response = QuerySeriesTypeResponse.createErrorResponse(groupId, status.getErrorMsg()); + } + task.run(response); + } + }); + } + task.await(); + QuerySeriesTypeResponse response = (QuerySeriesTypeResponse) task.getResponse(); + if (response == null || !response.isSuccess()) { + LOGGER.error("Execute get series type for {} statement false.", path); + throw new ProcessorException(); + } + return response.getDataType(); + } + + private TSDataType querySeriesType(SingleQPTask task, PeerId leader) + throws InterruptedException, RaftConnectionException { + BasicResponse response = asyncHandleNonQueryTaskGetRes(task, leader, 0); + return response == null ? null + : ((QuerySeriesTypeResponse) response).getDataType(); + } + /** * Handle "show storage group" statement locally * @@ -322,6 +499,112 @@ public class QueryMetadataExecutor extends ClusterQPExecutor { } /** + * Handle "show timeseries" statement + */ + private void asyncQueryMetadataLocally(String groupId, SingleQPTask task) + throws PathErrorException { + final byte[] reqContext = RaftUtils.createRaftRequestContext(); + DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder) server + .getDataPartitionHolder(groupId); + if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) { + QueryMetadataResponse response = QueryMetadataResponse + .createSuccessResponse(groupId, mManager.getMetadata()); + response.addResult(true); + task.run(response); + } else { + ((RaftService) dataPartitionHolder.getService()).getNode() + .readIndex(reqContext, new ReadIndexClosure() { + + @Override + public void run(Status status, long index, byte[] reqCtx) { + QueryMetadataResponse response; + if (status.isOk()) { + LOGGER.debug("start to read"); + try { + response = QueryMetadataResponse + .createSuccessResponse(groupId, mManager.getMetadata()); + response.addResult(true); + } catch (PathErrorException e) { + response = QueryMetadataResponse + .createErrorResponse(groupId, e.getMessage()); + response.addResult(false); + } + } else { + response = QueryMetadataResponse + .createErrorResponse(groupId, status.getErrorMsg()); + response.addResult(false); + } + task.run(response); + } + }); + } + } + + /** + * Handle "show timeseries <path>" statement + * + * @param pathList column path + */ + private List<String> queryPathsLocally(List<String> pathList, String groupId, + SingleQPTask task) + throws InterruptedException, ProcessorException { + final byte[] reqContext = RaftUtils.createRaftRequestContext(); + DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId); + + /** Check consistency level**/ + if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) { + QueryPathsResponse response = QueryPathsResponse + .createEmptyResponse(groupId); + try { + for (String path : pathList) { + response.addPaths(mManager.getPaths(path)); + } + } catch (final PathErrorException e) { + response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage()); + } + task.run(response); + } else { + ((RaftService) dataPartitionHolder.getService()).getNode() + .readIndex(reqContext, new ReadIndexClosure() { + + @Override + public void run(Status status, long index, byte[] reqCtx) { + QueryPathsResponse response = QueryPathsResponse + .createEmptyResponse(groupId); + if (status.isOk()) { + try { + LOGGER.debug("start to read"); + for (String path : pathList) { + response.addPaths(mManager.getPaths(path)); + } + } catch (final PathErrorException e) { + response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage()); + } + } else { + response = QueryPathsResponse + .createErrorResponse(groupId, status.getErrorMsg()); + } + task.run(response); + } + }); + } + task.await(); + QueryPathsResponse response = (QueryPathsResponse) task.getResponse(); + if (response == null || !response.isSuccess()) { + LOGGER.error("Execute get paths for {} statement false.", pathList); + throw new ProcessorException(); + } + return response.getPaths(); + } + + private List<String> queryPaths(SingleQPTask task, PeerId leader) + throws InterruptedException, RaftConnectionException { + BasicResponse response = asyncHandleNonQueryTaskGetRes(task, leader, 0); + return response == null ? new ArrayList<>() + : ((QueryPathsResponse) response).getPaths(); + } + + /** * Combine multiple metadata in String format into single String * * @return single String of all metadata diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java new file mode 100644 index 0000000..7af4adc --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.rpc.raft.processor; + +import com.alipay.remoting.AsyncContext; +import com.alipay.remoting.BizContext; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.closure.ReadIndexClosure; +import org.apache.iotdb.cluster.config.ClusterConstant; +import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder; +import org.apache.iotdb.cluster.entity.raft.RaftService; +import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataRequest; +import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataResponse; +import org.apache.iotdb.cluster.utils.RaftUtils; +import org.apache.iotdb.db.exception.PathErrorException; +import org.apache.iotdb.db.metadata.MManager; + +public class QueryMetadataAsyncProcessor extends + BasicAsyncUserProcessor<QueryMetadataRequest> { + + private MManager mManager = MManager.getInstance(); + + @Override + public void handleRequest(BizContext bizContext, AsyncContext asyncContext, + QueryMetadataRequest request) { + String groupId = request.getGroupID(); + final byte[] reqContext = RaftUtils.createRaftRequestContext(); + DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId); + + if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) { + QueryMetadataResponse response = null; + try { + response = QueryMetadataResponse + .createSuccessResponse(groupId, mManager.getMetadata()); + } catch (PathErrorException e) { + response = QueryMetadataResponse.createErrorResponse(groupId, e.getMessage()); + } + response.addResult(true); + asyncContext.sendResponse(response); + } else { + ((RaftService) dataPartitionHolder.getService()).getNode() + .readIndex(reqContext, new ReadIndexClosure() { + + @Override + public void run(Status status, long index, byte[] reqCtx) { + QueryMetadataResponse response; + if (status.isOk()) { + try { + response = QueryMetadataResponse + .createSuccessResponse(groupId, mManager.getMetadata()); + } catch (PathErrorException e) { + response = QueryMetadataResponse.createErrorResponse(groupId, e.getMessage()); + } + response.addResult(true); + } else { + response = QueryMetadataResponse + .createErrorResponse(groupId, status.getErrorMsg()); + response.addResult(false); + } + asyncContext.sendResponse(response); + } + }); + } + } + + @Override + public String interest() { + return QueryMetadataRequest.class.getName(); + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java new file mode 100644 index 0000000..b234a83 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.rpc.raft.processor; + +import com.alipay.remoting.AsyncContext; +import com.alipay.remoting.BizContext; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.closure.ReadIndexClosure; +import org.apache.iotdb.cluster.config.ClusterConstant; +import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder; +import org.apache.iotdb.cluster.entity.raft.RaftService; +import org.apache.iotdb.cluster.rpc.raft.request.QueryPathsRequest; +import org.apache.iotdb.cluster.rpc.raft.response.QueryPathsResponse; +import org.apache.iotdb.cluster.utils.RaftUtils; +import org.apache.iotdb.db.exception.PathErrorException; +import org.apache.iotdb.db.metadata.MManager; + +public class QueryPathsAsyncProcessor extends BasicAsyncUserProcessor<QueryPathsRequest> { + + private MManager mManager = MManager.getInstance(); + + @Override + public void handleRequest(BizContext bizContext, AsyncContext asyncContext, + QueryPathsRequest request) { + String groupId = request.getGroupID(); + final byte[] reqContext = RaftUtils.createRaftRequestContext(); + DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId); + + if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) { + QueryPathsResponse response = QueryPathsResponse + .createEmptyResponse(groupId); + try { + queryPaths(request, response); + } catch (final PathErrorException e) { + response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage()); + } + asyncContext.sendResponse(response); + } else { + ((RaftService) dataPartitionHolder.getService()).getNode() + .readIndex(reqContext, new ReadIndexClosure() { + + @Override + public void run(Status status, long index, byte[] reqCtx) { + QueryPathsResponse response = QueryPathsResponse + .createEmptyResponse(groupId); + if (status.isOk()) { + try { + queryPaths(request, response); + } catch (final PathErrorException e) { + response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage()); + } + } else { + response = QueryPathsResponse + .createErrorResponse(groupId, status.getErrorMsg()); + } + asyncContext.sendResponse(response); + } + }); + } + } + + /** + * Query paths + */ + private void queryPaths(QueryPathsRequest request, + QueryPathsResponse response) throws PathErrorException { + for (String path : request.getPath()) { + response.addPaths(mManager.getPaths(path)); + } + } + + @Override + public String interest() { + return QueryPathsRequest.class.getName(); + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java new file mode 100644 index 0000000..b72f0a2 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.rpc.raft.processor; + +import com.alipay.remoting.AsyncContext; +import com.alipay.remoting.BizContext; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.closure.ReadIndexClosure; +import org.apache.iotdb.cluster.config.ClusterConstant; +import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder; +import org.apache.iotdb.cluster.entity.raft.RaftService; +import org.apache.iotdb.cluster.rpc.raft.request.QuerySeriesTypeRequest; +import org.apache.iotdb.cluster.rpc.raft.response.QuerySeriesTypeResponse; +import org.apache.iotdb.cluster.utils.RaftUtils; +import org.apache.iotdb.db.exception.PathErrorException; +import org.apache.iotdb.db.metadata.MManager; + +public class QuerySeriesTypeAsyncProcessor extends BasicAsyncUserProcessor<QuerySeriesTypeRequest> { + + private MManager mManager = MManager.getInstance(); + + @Override + public void handleRequest(BizContext bizContext, AsyncContext asyncContext, + QuerySeriesTypeRequest request) { + String groupId = request.getGroupID(); + final byte[] reqContext = RaftUtils.createRaftRequestContext(); + DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId); + + if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) { + QuerySeriesTypeResponse response; + try { + response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(request.getPath())); + } catch (final PathErrorException e) { + response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage()); + } + asyncContext.sendResponse(response); + } else { + ((RaftService) dataPartitionHolder.getService()).getNode() + .readIndex(reqContext, new ReadIndexClosure() { + + @Override + public void run(Status status, long index, byte[] reqCtx) { + QuerySeriesTypeResponse response; + if (status.isOk()) { + try { + response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(request.getPath())); + } catch (final PathErrorException e) { + response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage()); + } + } else { + response = QuerySeriesTypeResponse + .createErrorResponse(groupId, status.getErrorMsg()); + } + asyncContext.sendResponse(response); + } + }); + } + } + + @Override + public String interest() { + return QuerySeriesTypeRequest.class.getName(); + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java new file mode 100644 index 0000000..2628fb6 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.rpc.raft.request; + +import java.io.Serializable; + +public class QueryMetadataRequest extends BasicQueryRequest implements Serializable { + + public QueryMetadataRequest(String groupID, int readConsistencyLevel) { + super(groupID, readConsistencyLevel); + } +} \ No newline at end of file diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java new file mode 100644 index 0000000..2c600f4 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.rpc.raft.request; + +import java.io.Serializable; +import java.util.List; + +public class QueryPathsRequest extends BasicQueryRequest implements Serializable { + + private List<String> path; + + public QueryPathsRequest(String groupID, int readConsistencyLevel, List<String> path) { + super(groupID, readConsistencyLevel); + this.path = path; + } + + public List<String> getPath() { + return path; + } +} \ No newline at end of file diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java new file mode 100644 index 0000000..c486576 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.rpc.raft.request; + +import java.io.Serializable; + +public class QuerySeriesTypeRequest extends BasicQueryRequest implements Serializable { + + private String path; + + public QuerySeriesTypeRequest(String groupID, int readConsistencyLevel, String path) { + super(groupID, readConsistencyLevel); + this.path = path; + } + + public String getPath() { + return path; + } +} \ No newline at end of file diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java new file mode 100644 index 0000000..6c21798 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.rpc.raft.response; + +import org.apache.iotdb.db.metadata.Metadata; + +public class QueryMetadataResponse extends BasicResponse { + + private Metadata metadata; + + private QueryMetadataResponse(String groupId, boolean redirected, String leaderStr, + String errorMsg) { + super(groupId, redirected, leaderStr, errorMsg); + } + + public static QueryMetadataResponse createSuccessResponse(String groupId, + Metadata metadata) { + QueryMetadataResponse response = new QueryMetadataResponse(groupId, false, null, + null); + response.metadata = metadata; + return response; + } + + public static QueryMetadataResponse createErrorResponse(String groupId, String errorMsg) { + return new QueryMetadataResponse(groupId, false, null, errorMsg); + } + + public Metadata getMetadata() { + return metadata; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java new file mode 100644 index 0000000..29d659a --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.rpc.raft.response; + +import java.util.ArrayList; +import java.util.List; + +public class QueryPathsResponse extends BasicResponse { + + private List<String> paths; + + private QueryPathsResponse(String groupId, boolean redirected, boolean success, String leaderStr, String errorMsg) { + super(groupId, redirected, leaderStr, errorMsg); + this.addResult(success); + paths = new ArrayList<>(); + } + + public static QueryPathsResponse createEmptyResponse(String groupId){ + return new QueryPathsResponse(groupId, false, true, null, null); + } + + public static QueryPathsResponse createErrorResponse(String groupId, String errorMsg) { + return new QueryPathsResponse(groupId, false, false, null, errorMsg); + } + + public List<String> getPaths() { + return paths; + } + + public void addPaths(List<String> paths){ + this.paths.addAll(paths); + } + +} \ No newline at end of file diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java new file mode 100644 index 0000000..e86e108 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.cluster.rpc.raft.response; + +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +public class QuerySeriesTypeResponse extends BasicResponse { + + private TSDataType dataType; + + private QuerySeriesTypeResponse(String groupId, boolean redirected, String leaderStr, + String errorMsg) { + super(groupId, redirected, leaderStr, errorMsg); + } + + public static QuerySeriesTypeResponse createSuccessResponse(String groupId, TSDataType dataType) { + QuerySeriesTypeResponse response = new QuerySeriesTypeResponse(groupId, false, null, + null); + response.dataType = dataType; + return response; + } + + public static QuerySeriesTypeResponse createErrorResponse(String groupId, String errorMsg) { + return new QuerySeriesTypeResponse(groupId, false, null, errorMsg); + } + + public TSDataType getDataType() { + return dataType; + } + + public void setDataType(TSDataType dataType) { + this.dataType = dataType; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java index ba4e59d..320398b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java @@ -34,11 +34,14 @@ import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.metadata.Metadata; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.service.TSServiceImpl; import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq; import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp; import org.apache.iotdb.service.rpc.thrift.TS_StatusCode; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -244,6 +247,23 @@ public class TSServiceClusterImpl extends TSServiceImpl { return queryMetadataExecutor.get().processMetadataInStringQuery(); } + @Override + protected Metadata getMetadata() + throws InterruptedException, ProcessorException, PathErrorException { + return queryMetadataExecutor.get().processMetadataQuery(); + } + + @Override + protected TSDataType getSeriesType(String path) throws PathErrorException, InterruptedException, ProcessorException { + return queryMetadataExecutor.get().processSeriesTypeQuery(path); + } + + @Override + protected List<String> getPaths(String path) + throws PathErrorException, InterruptedException, ProcessorException { + return queryMetadataExecutor.get().processPathsQuery(path); + } + @OnlyForTest public NonQueryExecutor getNonQueryExecutor() { return nonQueryExecutor.get(); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java index 40fd0e4..15b6011 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java @@ -18,8 +18,11 @@ */ package org.apache.iotdb.db.metadata; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -67,6 +70,57 @@ public class Metadata { return deviceIdMap; } + /** + * combine multiple metadatas + */ + public static Metadata combineMetadatas(Metadata[] metadatas) { + Map<String, List<MeasurementSchema>> seriesMap = new HashMap<>(); + Map<String, List<String>> deviceIdMap = new HashMap<>(); + Map<String, Map<String, MeasurementSchema>> typeSchemaMap = new HashMap<>(); + + if (metadatas == null || metadatas.length == 0) { + return new Metadata(seriesMap, deviceIdMap); + } + + for (int i = 0; i < metadatas.length; i++) { + Map<String, List<MeasurementSchema>> subSeriesMap = metadatas[i].seriesMap; + for (Entry<String, List<MeasurementSchema>> entry : subSeriesMap.entrySet()) { + Map<String, MeasurementSchema> map; + if (typeSchemaMap.containsKey(entry.getKey())) { + map = typeSchemaMap.get(entry.getKey()); + } else { + map = new HashMap<>(); + } + entry.getValue().forEach(schema -> map.put(schema.getMeasurementId(), schema)); + if (!typeSchemaMap.containsKey(entry.getKey())) { + typeSchemaMap.put(entry.getKey(), map); + } + } + + Map<String, List<String>> subDeviceIdMap = metadatas[i].deviceIdMap; + for (Entry<String, List<String>> entry : subDeviceIdMap.entrySet()) { + List<String> list; + if (deviceIdMap.containsKey(entry.getKey())) { + list = deviceIdMap.get(entry.getKey()); + } else { + list = new ArrayList<>(); + } + list.addAll(entry.getValue()); + if (!deviceIdMap.containsKey(entry.getKey())) { + deviceIdMap.put(entry.getKey(), list); + } + } + } + + for (Entry<String, Map<String, MeasurementSchema>> entry : typeSchemaMap.entrySet()) { + List<MeasurementSchema> list = new ArrayList<>(); + list.addAll(entry.getValue().values()); + seriesMap.put(entry.getKey(), list); + } + + return new Metadata(seriesMap, deviceIdMap); + } + @Override public String toString() { return seriesMap.toString() + "\n" + deviceIdMap.toString(); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 79dbd53..299133c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -81,6 +81,7 @@ import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle; import org.apache.iotdb.service.rpc.thrift.TS_Status; import org.apache.iotdb.service.rpc.thrift.TS_StatusCode; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.thrift.TException; @@ -308,14 +309,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { Metadata metadata; try { String column = req.getColumnPath(); - metadata = MManager.getInstance().getMetadata(); + metadata = getMetadata(); Map<String, List<String>> deviceMap = metadata.getDeviceMap(); if (deviceMap == null || !deviceMap.containsKey(column)) { resp.setColumnsList(new ArrayList<>()); } else { resp.setColumnsList(deviceMap.get(column)); } - } catch (PathErrorException e) { + } catch (PathErrorException | InterruptedException | ProcessorException e) { LOGGER.error("cannot get delta object map", e); status = getErrorStatus(String.format("Failed to fetch delta object map because: %s", e)); resp.setStatus(status); @@ -330,8 +331,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { break; case "COLUMN": try { - resp.setDataType(MManager.getInstance().getSeriesType(req.getColumnPath()).toString()); - } catch (PathErrorException e) { + resp.setDataType(getSeriesType(req.getColumnPath()).toString()); + } catch (PathErrorException | InterruptedException | ProcessorException e) { // TODO aggregate seriesPath e.g. last(root.ln.wf01.wt01.status) // status = new TS_Status(TS_StatusCode.ERROR_STATUS); // status.setErrorMessage(String.format("Failed to fetch %s's data type because: %s", @@ -343,8 +344,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { break; case "ALL_COLUMNS": try { - resp.setColumnsList(MManager.getInstance().getPaths(req.getColumnPath())); - } catch (PathErrorException e) { + resp.setColumnsList(getPaths(req.getColumnPath())); + } catch (PathErrorException | InterruptedException | ProcessorException e) { status = getErrorStatus(String .format("Failed to fetch %s's all columns because: %s", req.getColumnPath(), e)); resp.setStatus(status); @@ -382,6 +383,18 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return MManager.getInstance().getMetadataInString(); } + protected Metadata getMetadata() throws PathErrorException, InterruptedException, ProcessorException { + return MManager.getInstance().getMetadata(); + } + + protected TSDataType getSeriesType(String path) throws PathErrorException, InterruptedException, ProcessorException { + return MManager.getInstance().getSeriesType(path); + } + + protected List<String> getPaths(String path) throws PathErrorException, InterruptedException, ProcessorException { + return MManager.getInstance().getPaths(path); + } + /** * Judge whether the statement is ADMIN COMMAND and if true, executeWithGlobalTimeFilter it. *
