This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster_scalability in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9601a7eedbc2295fd270502a21d7dafdfc0af7ce Merge: 0792dea 3bf4114 Author: lta <[email protected]> AuthorDate: Fri May 14 12:09:21 2021 +0800 merge master Jenkinsfile | 2 +- LICENSE-binary | 4 +- README.md | 3 +- README_ZH.md | 3 +- .../org/apache/iotdb/tool/AbstractCsvTool.java | 39 ++-- .../main/java/org/apache/iotdb/tool/ExportCsv.java | 2 +- .../main/java/org/apache/iotdb/tool/ImportCsv.java | 56 ++--- client-py/README.md | 5 +- .../resources/conf/iotdb-cluster.properties | 3 + .../java/org/apache/iotdb/cluster/ClusterMain.java | 14 ++ .../apache/iotdb/cluster/config/ClusterConfig.java | 9 + .../iotdb/cluster/config/ClusterDescriptor.java | 9 +- .../iotdb/cluster/log/applier/BaseApplier.java | 37 +++- .../apache/iotdb/cluster/metadata/CMManager.java | 12 +- .../cluster/query/ClusterDataQueryExecutor.java | 5 +- .../query/last/ClusterLastQueryExecutor.java | 1 + .../cluster/query/reader/ClusterReaderFactory.java | 12 +- .../query/reader/mult/RemoteMultSeriesReader.java | 7 +- .../iotdb/cluster/server/MetaClusterServer.java | 27 ++- .../server/clusterinfo/ClusterInfoServer.java | 94 ++++++++ .../server/clusterinfo/ClusterInfoServerMBean.java | 23 +- .../server/clusterinfo/ClusterInfoServiceImpl.java | 71 ++++++ .../ClusterInfoServiceThriftHandler.java | 55 +++++ .../cluster/utils/nodetool/ClusterMonitor.java | 10 + .../utils/nodetool/ClusterMonitorMBean.java | 2 +- .../cluster/log/applier/DataLogApplierTest.java | 35 +++ .../query/groupby/MergeGroupByExecutorTest.java | 23 +- .../query/groupby/RemoteGroupByExecutorTest.java | 23 +- .../server/clusterinfo/ClusterInfoServerTest.java | 73 +++++++ .../clusterinfo/ClusterInfoServiceImplTest.java | 98 +++++++++ .../cluster/server/member/DataGroupMemberTest.java | 28 ++- .../cluster/server/member/MetaGroupMemberTest.java | 20 +- .../resources/node1conf/iotdb-cluster.properties | 1 + .../resources/node2conf/iotdb-cluster.properties | 1 + .../resources/node3conf/iotdb-cluster.properties | 1 + docs/UserGuide/API/Programming-Java-Native-API.md | 98 +++++++++ .../Administration-Management/Administration.md | 7 +- docs/UserGuide/Cluster/Cluster-Setup.md | 9 + .../Data-Concept/Data-Model-and-Terminology.md | 4 +- .../DDL-Data-Definition-Language.md | 2 +- .../UserGuide/API/Programming-Java-Native-API.md | 95 ++++++++ .../Administration-Management/Administration.md | 5 +- docs/zh/UserGuide/Cluster/Cluster-Setup.md | 9 + .../Data-Concept/Data-Model-and-Terminology.md | 4 +- .../DDL-Data-Definition-Language.md | 2 +- .../apache/iotdb/tsfile/TsFileSequenceRead.java | 5 + .../resources/conf/iotdb-engine.properties | 11 + .../apache/iotdb/db/auth/entity/PrivilegeType.java | 1 + .../org/apache/iotdb/db/concurrent/ThreadName.java | 3 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 32 ++- .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java | 28 ++- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 19 ++ .../db/engine/cache/TimeSeriesMetadataCache.java | 121 +++++++++++ .../db/engine/compaction/TsFileManagement.java | 21 +- .../level/LevelCompactionTsFileManagement.java | 51 +++-- .../engine/compaction/utils/CompactionUtils.java | 81 +++++-- .../engine/storagegroup/StorageGroupProcessor.java | 42 +++- .../db/engine/storagegroup/TsFileResource.java | 15 +- .../org/apache/iotdb/db/metadata/MManager.java | 64 ++++-- .../org/apache/iotdb/db/metadata/PartialPath.java | 15 ++ .../apache/iotdb/db/qp/executor/PlanExecutor.java | 7 +- .../org/apache/iotdb/db/qp/physical/BatchPlan.java | 54 +++-- .../db/qp/physical/crud/InsertMultiTabletPlan.java | 39 +++- .../physical/crud/InsertRowsOfOneDevicePlan.java | 33 ++- .../iotdb/db/qp/physical/crud/InsertRowsPlan.java | 39 +++- .../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 39 +++- .../apache/iotdb/db/qp/physical/sys/ShowPlan.java | 1 - .../apache/iotdb/db/query/dataset/ShowDataSet.java | 12 +- .../db/query/executor/fill/LastPointReader.java | 51 ++--- .../chunk/metadata/DiskChunkMetadataLoader.java | 2 +- .../query/reader/series/SeriesAggregateReader.java | 2 +- .../reader/series/SeriesRawDataBatchReader.java | 4 +- .../iotdb/db/query/reader/series/SeriesReader.java | 13 +- .../reader/series/SeriesReaderByTimestamp.java | 2 +- .../query/reader/series/SeriesReaderFactory.java | 94 -------- .../db/query/reader/series/VectorSeriesReader.java | 144 ------------ .../org/apache/iotdb/db/service/ServiceType.java | 5 +- .../java/org/apache/iotdb/db/utils/AuthUtils.java | 2 - .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 94 +++++++- .../iotdb/db/utils/datastructure/VectorTVList.java | 6 +- .../apache/iotdb/db/auth/AuthorityCheckerTest.java | 2 - .../auth/authorizer/LocalFileAuthorizerTest.java | 6 +- .../db/engine/compaction/CompactionChunkTest.java | 4 +- .../compaction/LevelCompactionCacheTest.java | 3 +- .../engine/compaction/LevelCompactionLogTest.java | 3 +- .../compaction/LevelCompactionMergeTest.java | 83 ++++++- .../compaction/LevelCompactionMoreDataTest.java | 3 +- .../NoCompactionTsFileManagementTest.java | 4 +- .../iotdb/db/integration/IoTDBSimpleQueryIT.java | 36 +++ .../iotdb/db/metadata/MManagerBasicTest.java | 242 +++++++++++++++++++++ .../org/apache/iotdb/db/script/EnvScriptIT.java | 6 +- .../apache/iotdb/db/sink/LocalIoTDBSinkTest.java | 3 +- .../test/java/org/apache/iotdb/db/sql/Cases.java | 149 +++++++++++++ .../java/org/apache/iotdb/db/sql/ClusterIT.java | 131 +++++------ .../java/org/apache/iotdb/db/sql/SingleNodeIT.java | 69 ++---- .../iotdb/db/sql/node1/OneNodeClusterIT.java | 54 +++++ .../db/sql/nodes3/AbstractThreeNodeClusterIT.java | 61 ++++++ .../iotdb/db/sql/nodes3/ThreeNodeCluster1IT.java | 24 +- .../iotdb/db/sql/nodes3/ThreeNodeCluster2IT.java | 27 +-- .../db/sql/nodes5/AbstractFiveNodeClusterIT.java | 85 ++++++++ .../iotdb/db/sql/nodes5/FiveNodeCluster1IT.java | 24 +- .../iotdb/db/sql/nodes5/FiveNodeCluster2IT.java | 27 +-- .../iotdb/db/sql/nodes5/FiveNodeCluster4IT.java | 27 +-- .../test/resources/1node/iotdb-cluster.properties | 2 +- thrift-cluster/src/main/thrift/cluster.thrift | 50 ++++- thrift-sync/pom.xml | 5 - .../tsfile/common/constant/TsFileConstant.java | 3 + .../iotdb/tsfile/file/metadata/ChunkMetadata.java | 16 ++ .../iotdb/tsfile/file/metadata/IChunkMetadata.java | 6 + .../tsfile/file/metadata/ITimeSeriesMetadata.java | 3 + .../file/metadata/MetadataIndexConstructor.java | 54 ++++- .../tsfile/file/metadata/MetadataIndexNode.java | 2 +- .../tsfile/file/metadata/TimeseriesMetadata.java | 20 +- .../tsfile/file/metadata/VectorChunkMetadata.java | 24 ++ .../file/metadata/VectorTimeSeriesMetadata.java | 16 ++ .../iotdb/tsfile/read/TsFileSequenceReader.java | 80 +++++-- .../tsfile/v2/file/metadata/TsFileMetadataV2.java | 9 +- .../tsfile/v2/read/TsFileSequenceReaderForV2.java | 16 +- .../iotdb/tsfile/write/chunk/TimeChunkWriter.java | 3 +- .../iotdb/tsfile/write/chunk/ValueChunkWriter.java | 3 +- .../iotdb/tsfile/write/writer/TsFileIOWriter.java | 121 ++++++++--- 121 files changed, 2786 insertions(+), 915 deletions(-) diff --cc cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java index 7b2ae4c,a904838..adc4661 --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java @@@ -123,10 -125,9 +128,14 @@@ public class ClusterMain preStartCustomize(); metaServer.start(); metaServer.joinCluster(); + // Currently, we do not register ClusterInfoService as a JMX Bean, + // so we use startService() rather than start() + ClusterInfoServer.getInstance().startService(); ++ + logger.info( + "Adding this node {} to cluster costs {} ms", + metaServer.getMember().getThisNode(), + (System.currentTimeMillis() - startTime)); } catch (TTransportException | StartupException | QueryProcessException diff --cc cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java index f8b73e9,eb47005..cd9384c --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java @@@ -340,9 -315,8 +339,9 @@@ public class ClusterReaderFactory valueFilter, context, dataGroupMember, - ascending); + ascending, + null); - partialPathPointReaderMap.put(partialPath.getFullPath(), seriesPointReader); + partialPathPointReaderMap.put(PartialPath.getExactFullPath(partialPath), seriesPointReader); } if (logger.isDebugEnabled()) { @@@ -542,19 -507,14 +541,19 @@@ Filter valueFilter, QueryContext context, Node header, - boolean ascending) + int raftId, + boolean ascending, + Set<Integer> requiredSlots) throws StorageEngineException, QueryProcessException { ClusterQueryUtils.checkPathExistence(path); - List<Integer> nodeSlots = - ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header); + if (requiredSlots == null) { + List<Integer> nodeSlots = + ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header, raftId); + requiredSlots = new HashSet<>(nodeSlots); + } QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter); - return SeriesReaderFactory.createSeriesReader( + return new SeriesReader( path, allSensors, dataType, @@@ -1136,11 -999,10 +1132,11 @@@ timeFilter, valueFilter, context, - dataGroupMember.getHeader(), - ascending); - partialPathBatchReaderMap.put( - PartialPath.getExactFullPath(partialPath), new SeriesRawDataBatchReader(seriesReader)); + dataGroupMember, + ascending, + requiredSlots, + false); - partialPathBatchReaderMap.put(partialPath.getFullPath(), batchReader); ++ partialPathBatchReaderMap.put(PartialPath.getExactFullPath(partialPath), batchReader); } return new MultBatchReader(partialPathBatchReaderMap); } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImpl.java index 0000000,82cb1b0..6891a6e mode 000000,100644..100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImpl.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImpl.java @@@ -1,0 -1,71 +1,71 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + package org.apache.iotdb.cluster.server.clusterinfo; + + import org.apache.iotdb.cluster.partition.PartitionGroup; + import org.apache.iotdb.cluster.rpc.thrift.ClusterInfoService; + import org.apache.iotdb.cluster.rpc.thrift.DataPartitionEntry; + import org.apache.iotdb.cluster.rpc.thrift.Node; + import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor; + + import org.apache.commons.collections4.map.MultiKeyMap; + import org.apache.thrift.TException; + + import java.util.ArrayList; + import java.util.List; + import java.util.Map; + + public class ClusterInfoServiceImpl implements ClusterInfoService.Iface { + + @Override + public List<Node> getRing() throws TException { + return ClusterMonitor.INSTANCE.getRing(); + } + + @Override + public List<DataPartitionEntry> getDataPartition(String path, long startTime, long endTime) { + MultiKeyMap<Long, PartitionGroup> partitions = + ClusterMonitor.INSTANCE.getDataPartition(path, startTime, endTime); + List<DataPartitionEntry> result = new ArrayList<>(partitions.size()); + partitions.forEach( + (multikey, nodes) -> + result.add(new DataPartitionEntry(multikey.getKey(0), multikey.getKey(1), nodes))); + return result; + } + + @Override + public List<Node> getMetaPartition(String path) throws TException { + return ClusterMonitor.INSTANCE.getMetaPartition(path); + } + + @Override - public Map<Node, Boolean> getAllNodeStatus() throws TException { ++ public Map<Node, Integer> getAllNodeStatus() throws TException { + return ClusterMonitor.INSTANCE.getAllNodeStatus(); + } + + @Override + public String getInstrumentingInfo() throws TException { + return ClusterMonitor.INSTANCE.getInstrumentingInfo(); + } + + public void handleClientExit() { + // do something when a client connection exits. + } + } diff --cc cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java index 6ab551f,42d28a4..6d0b75e --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java @@@ -75,59 -66,9 +77,67 @@@ public class ClusterMonitor implements } @Override + public List<Pair<Node, NodeCharacter>> getMetaGroup() { + MetaGroupMember metaMember = getMetaGroupMember(); + if (metaMember.getPartitionTable() == null) { + return null; + } + List<Pair<Node, NodeCharacter>> res = new ArrayList<>(); + for (Node node : metaMember.getPartitionTable().getAllNodes()) { + if (node.equals(metaMember.getLeader())) { + res.add(new Pair<>(node, NodeCharacter.LEADER)); + } else { + res.add(new Pair<>(node, NodeCharacter.FOLLOWER)); + } + } + return res; + } + + public List<Node> getRing() { - PartitionTable partitionTable = getPartitionTable(); - return partitionTable != null ? partitionTable.getAllNodes() : null; ++ MetaGroupMember metaMember = getMetaGroupMember(); ++ if (metaMember.getPartitionTable() == null) { ++ return null; ++ } ++ return metaMember.getPartitionTable().getAllNodes(); ++ } ++ + @Override + public List<Pair<Node, NodeCharacter>> getDataGroup(int raftId) throws Exception { + MetaGroupMember metaMember = getMetaGroupMember(); + if (metaMember.getPartitionTable() == null) { + return null; + } + RaftNode raftNode = new RaftNode(metaMember.getThisNode(), raftId); + DataGroupMember dataMember = + metaMember.getDataClusterServer().getHeaderGroupMap().getOrDefault(raftNode, null); + if (dataMember == null) { + throw new Exception(String.format("Partition whose header is %s doesn't exist.", raftNode)); + } + List<Pair<Node, NodeCharacter>> res = new ArrayList<>(); + for (Node node : dataMember.getAllNodes()) { + if (node.equals(metaMember.getThisNode())) { + res.add(new Pair<>(node, NodeCharacter.LEADER)); + } else { + res.add(new Pair<>(node, NodeCharacter.FOLLOWER)); + } + } + return res; + } + + @Override + public Map<PartitionGroup, Integer> getSlotNumInDataMigration() throws Exception { + MetaGroupMember member = getMetaGroupMember(); + if (member.getPartitionTable() == null) { + throw new Exception(BUILDING_CLUSTER_INFO); + } + if (member.getCharacter() != NodeCharacter.LEADER) { + if (member.getLeader() == null || member.getLeader().equals(ClusterConstant.EMPTY_NODE)) { + throw new Exception(META_LEADER_UNKNOWN_INFO); + } else { + throw new Exception(NodeToolCmd.redirectToQueryMetaLeader(member.getLeader())); + } + } + return member.collectAllPartitionMigrationStatus(); } @Override diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java index 07fadcd,475bef2..39451dd --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java @@@ -126,19 -118,22 +126,20 @@@ import java.nio.ByteBuffer import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; + import java.util.HashSet; import java.util.List; import java.util.Map; + import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static org.apache.iotdb.cluster.server.NodeCharacter.ELECTOR; import static org.apache.iotdb.cluster.server.NodeCharacter.FOLLOWER; import static org.apache.iotdb.cluster.server.NodeCharacter.LEADER; -import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; - import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@@ -925,11 -893,10 +928,11 @@@ public class MetaGroupMemberTest extend IReaderByTimestamp readerByTimestamp = readerFactory.getReaderByTimestamp( new PartialPath(TestUtils.getTestSeries(i, 0)), - Collections.singleton(TestUtils.getTestMeasurement(0)), + deviceMeasurements, TSDataType.DOUBLE, context, - true); + true, + null); Object[] values = readerByTimestamp.getValuesInTimestamps(times, 10); for (int j = 0; j < 10; j++) { diff --cc thrift-cluster/src/main/thrift/cluster.thrift index fa7ceb9,2bcf9bf..66f5e5d --- a/thrift-cluster/src/main/thrift/cluster.thrift +++ b/thrift-cluster/src/main/thrift/cluster.thrift @@@ -535,5 -508,53 +535,53 @@@ service TSMetaService extends RaftServi * cannot know when another node resumes, and handshakes are mainly used to update node status * on coordinator side. **/ - void handshake(Node sender) + void handshake(1:Node sender); } + + + struct DataPartitionEntry{ + 1: required long startTime, + 2: required long endTime, + 3: required list<Node> nodes + } + + /** + * for cluster maintainer. + * The interface will replace the JMX based NodeTool APIs. + **/ + service ClusterInfoService { + + /** + * Get physical hash ring + */ + list<Node> getRing(); + + /** + * Get data partition information of input path and time range. + * @param path input path + * @return data partition information + */ + list<DataPartitionEntry> getDataPartition(1:string path, 2:long startTime, 3:long endTime); + + /** + * Get metadata partition information of input path + * + * @param path input path + * @return metadata partition information + */ + list<Node> getMetaPartition(1:string path); + + /** + * Get status of all nodes + * - * @return key: node, value: live or not ++ * @return key: node, value: 0(live), 1(offline), 2(joining), 3(leaving) + */ - map<Node, bool> getAllNodeStatus(); ++ map<Node, int> getAllNodeStatus(); + + /** + * @return A multi-line string with each line representing the total time consumption, invocation + * number, and average time consumption. + */ + string getInstrumentingInfo(); + + }
