This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9601a7eedbc2295fd270502a21d7dafdfc0af7ce
Merge: 0792dea 3bf4114
Author: lta <[email protected]>
AuthorDate: Fri May 14 12:09:21 2021 +0800

    merge master

 Jenkinsfile                                        |   2 +-
 LICENSE-binary                                     |   4 +-
 README.md                                          |   3 +-
 README_ZH.md                                       |   3 +-
 .../org/apache/iotdb/tool/AbstractCsvTool.java     |  39 ++--
 .../main/java/org/apache/iotdb/tool/ExportCsv.java |   2 +-
 .../main/java/org/apache/iotdb/tool/ImportCsv.java |  56 ++---
 client-py/README.md                                |   5 +-
 .../resources/conf/iotdb-cluster.properties        |   3 +
 .../java/org/apache/iotdb/cluster/ClusterMain.java |  14 ++
 .../apache/iotdb/cluster/config/ClusterConfig.java |   9 +
 .../iotdb/cluster/config/ClusterDescriptor.java    |   9 +-
 .../iotdb/cluster/log/applier/BaseApplier.java     |  37 +++-
 .../apache/iotdb/cluster/metadata/CMManager.java   |  12 +-
 .../cluster/query/ClusterDataQueryExecutor.java    |   5 +-
 .../query/last/ClusterLastQueryExecutor.java       |   1 +
 .../cluster/query/reader/ClusterReaderFactory.java |  12 +-
 .../query/reader/mult/RemoteMultSeriesReader.java  |   7 +-
 .../iotdb/cluster/server/MetaClusterServer.java    |  27 ++-
 .../server/clusterinfo/ClusterInfoServer.java      |  94 ++++++++
 .../server/clusterinfo/ClusterInfoServerMBean.java |  23 +-
 .../server/clusterinfo/ClusterInfoServiceImpl.java |  71 ++++++
 .../ClusterInfoServiceThriftHandler.java           |  55 +++++
 .../cluster/utils/nodetool/ClusterMonitor.java     |  10 +
 .../utils/nodetool/ClusterMonitorMBean.java        |   2 +-
 .../cluster/log/applier/DataLogApplierTest.java    |  35 +++
 .../query/groupby/MergeGroupByExecutorTest.java    |  23 +-
 .../query/groupby/RemoteGroupByExecutorTest.java   |  23 +-
 .../server/clusterinfo/ClusterInfoServerTest.java  |  73 +++++++
 .../clusterinfo/ClusterInfoServiceImplTest.java    |  98 +++++++++
 .../cluster/server/member/DataGroupMemberTest.java |  28 ++-
 .../cluster/server/member/MetaGroupMemberTest.java |  20 +-
 .../resources/node1conf/iotdb-cluster.properties   |   1 +
 .../resources/node2conf/iotdb-cluster.properties   |   1 +
 .../resources/node3conf/iotdb-cluster.properties   |   1 +
 docs/UserGuide/API/Programming-Java-Native-API.md  |  98 +++++++++
 .../Administration-Management/Administration.md    |   7 +-
 docs/UserGuide/Cluster/Cluster-Setup.md            |   9 +
 .../Data-Concept/Data-Model-and-Terminology.md     |   4 +-
 .../DDL-Data-Definition-Language.md                |   2 +-
 .../UserGuide/API/Programming-Java-Native-API.md   |  95 ++++++++
 .../Administration-Management/Administration.md    |   5 +-
 docs/zh/UserGuide/Cluster/Cluster-Setup.md         |   9 +
 .../Data-Concept/Data-Model-and-Terminology.md     |   4 +-
 .../DDL-Data-Definition-Language.md                |   2 +-
 .../apache/iotdb/tsfile/TsFileSequenceRead.java    |   5 +
 .../resources/conf/iotdb-engine.properties         |  11 +
 .../apache/iotdb/db/auth/entity/PrivilegeType.java |   1 +
 .../org/apache/iotdb/db/concurrent/ThreadName.java |   3 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  32 ++-
 .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java |  28 ++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  19 ++
 .../db/engine/cache/TimeSeriesMetadataCache.java   | 121 +++++++++++
 .../db/engine/compaction/TsFileManagement.java     |  21 +-
 .../level/LevelCompactionTsFileManagement.java     |  51 +++--
 .../engine/compaction/utils/CompactionUtils.java   |  81 +++++--
 .../engine/storagegroup/StorageGroupProcessor.java |  42 +++-
 .../db/engine/storagegroup/TsFileResource.java     |  15 +-
 .../org/apache/iotdb/db/metadata/MManager.java     |  64 ++++--
 .../org/apache/iotdb/db/metadata/PartialPath.java  |  15 ++
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   7 +-
 .../org/apache/iotdb/db/qp/physical/BatchPlan.java |  54 +++--
 .../db/qp/physical/crud/InsertMultiTabletPlan.java |  39 +++-
 .../physical/crud/InsertRowsOfOneDevicePlan.java   |  33 ++-
 .../iotdb/db/qp/physical/crud/InsertRowsPlan.java  |  39 +++-
 .../qp/physical/sys/CreateMultiTimeSeriesPlan.java |  39 +++-
 .../apache/iotdb/db/qp/physical/sys/ShowPlan.java  |   1 -
 .../apache/iotdb/db/query/dataset/ShowDataSet.java |  12 +-
 .../db/query/executor/fill/LastPointReader.java    |  51 ++---
 .../chunk/metadata/DiskChunkMetadataLoader.java    |   2 +-
 .../query/reader/series/SeriesAggregateReader.java |   2 +-
 .../reader/series/SeriesRawDataBatchReader.java    |   4 +-
 .../iotdb/db/query/reader/series/SeriesReader.java |  13 +-
 .../reader/series/SeriesReaderByTimestamp.java     |   2 +-
 .../query/reader/series/SeriesReaderFactory.java   |  94 --------
 .../db/query/reader/series/VectorSeriesReader.java | 144 ------------
 .../org/apache/iotdb/db/service/ServiceType.java   |   5 +-
 .../java/org/apache/iotdb/db/utils/AuthUtils.java  |   2 -
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |  94 +++++++-
 .../iotdb/db/utils/datastructure/VectorTVList.java |   6 +-
 .../apache/iotdb/db/auth/AuthorityCheckerTest.java |   2 -
 .../auth/authorizer/LocalFileAuthorizerTest.java   |   6 +-
 .../db/engine/compaction/CompactionChunkTest.java  |   4 +-
 .../compaction/LevelCompactionCacheTest.java       |   3 +-
 .../engine/compaction/LevelCompactionLogTest.java  |   3 +-
 .../compaction/LevelCompactionMergeTest.java       |  83 ++++++-
 .../compaction/LevelCompactionMoreDataTest.java    |   3 +-
 .../NoCompactionTsFileManagementTest.java          |   4 +-
 .../iotdb/db/integration/IoTDBSimpleQueryIT.java   |  36 +++
 .../iotdb/db/metadata/MManagerBasicTest.java       | 242 +++++++++++++++++++++
 .../org/apache/iotdb/db/script/EnvScriptIT.java    |   6 +-
 .../apache/iotdb/db/sink/LocalIoTDBSinkTest.java   |   3 +-
 .../test/java/org/apache/iotdb/db/sql/Cases.java   | 149 +++++++++++++
 .../java/org/apache/iotdb/db/sql/ClusterIT.java    | 131 +++++------
 .../java/org/apache/iotdb/db/sql/SingleNodeIT.java |  69 ++----
 .../iotdb/db/sql/node1/OneNodeClusterIT.java       |  54 +++++
 .../db/sql/nodes3/AbstractThreeNodeClusterIT.java  |  61 ++++++
 .../iotdb/db/sql/nodes3/ThreeNodeCluster1IT.java   |  24 +-
 .../iotdb/db/sql/nodes3/ThreeNodeCluster2IT.java   |  27 +--
 .../db/sql/nodes5/AbstractFiveNodeClusterIT.java   |  85 ++++++++
 .../iotdb/db/sql/nodes5/FiveNodeCluster1IT.java    |  24 +-
 .../iotdb/db/sql/nodes5/FiveNodeCluster2IT.java    |  27 +--
 .../iotdb/db/sql/nodes5/FiveNodeCluster4IT.java    |  27 +--
 .../test/resources/1node/iotdb-cluster.properties  |   2 +-
 thrift-cluster/src/main/thrift/cluster.thrift      |  50 ++++-
 thrift-sync/pom.xml                                |   5 -
 .../tsfile/common/constant/TsFileConstant.java     |   3 +
 .../iotdb/tsfile/file/metadata/ChunkMetadata.java  |  16 ++
 .../iotdb/tsfile/file/metadata/IChunkMetadata.java |   6 +
 .../tsfile/file/metadata/ITimeSeriesMetadata.java  |   3 +
 .../file/metadata/MetadataIndexConstructor.java    |  54 ++++-
 .../tsfile/file/metadata/MetadataIndexNode.java    |   2 +-
 .../tsfile/file/metadata/TimeseriesMetadata.java   |  20 +-
 .../tsfile/file/metadata/VectorChunkMetadata.java  |  24 ++
 .../file/metadata/VectorTimeSeriesMetadata.java    |  16 ++
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |  80 +++++--
 .../tsfile/v2/file/metadata/TsFileMetadataV2.java  |   9 +-
 .../tsfile/v2/read/TsFileSequenceReaderForV2.java  |  16 +-
 .../iotdb/tsfile/write/chunk/TimeChunkWriter.java  |   3 +-
 .../iotdb/tsfile/write/chunk/ValueChunkWriter.java |   3 +-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  | 121 ++++++++---
 121 files changed, 2786 insertions(+), 915 deletions(-)

diff --cc cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index 7b2ae4c,a904838..adc4661
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@@ -123,10 -125,9 +128,14 @@@ public class ClusterMain 
          preStartCustomize();
          metaServer.start();
          metaServer.joinCluster();
+         // Currently, we do not register ClusterInfoService as a JMX Bean,
+         // so we use startService() rather than start()
+         ClusterInfoServer.getInstance().startService();
++
 +        logger.info(
 +            "Adding this node {} to cluster costs {} ms",
 +            metaServer.getMember().getThisNode(),
 +            (System.currentTimeMillis() - startTime));
        } catch (TTransportException
            | StartupException
            | QueryProcessException
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index f8b73e9,eb47005..cd9384c
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@@ -340,9 -315,8 +339,9 @@@ public class ClusterReaderFactory 
                  valueFilter,
                  context,
                  dataGroupMember,
 -                ascending);
 +                ascending,
 +                null);
-         partialPathPointReaderMap.put(partialPath.getFullPath(), 
seriesPointReader);
+         
partialPathPointReaderMap.put(PartialPath.getExactFullPath(partialPath), 
seriesPointReader);
        }
  
        if (logger.isDebugEnabled()) {
@@@ -542,19 -507,14 +541,19 @@@
        Filter valueFilter,
        QueryContext context,
        Node header,
 -      boolean ascending)
 +      int raftId,
 +      boolean ascending,
 +      Set<Integer> requiredSlots)
        throws StorageEngineException, QueryProcessException {
      ClusterQueryUtils.checkPathExistence(path);
 -    List<Integer> nodeSlots =
 -        ((SlotPartitionTable) 
metaGroupMember.getPartitionTable()).getNodeSlots(header);
 +    if (requiredSlots == null) {
 +      List<Integer> nodeSlots =
 +          ((SlotPartitionTable) 
metaGroupMember.getPartitionTable()).getNodeSlots(header, raftId);
 +      requiredSlots = new HashSet<>(nodeSlots);
 +    }
      QueryDataSource queryDataSource =
          QueryResourceManager.getInstance().getQueryDataSource(path, context, 
timeFilter);
-     return SeriesReaderFactory.createSeriesReader(
+     return new SeriesReader(
          path,
          allSensors,
          dataType,
@@@ -1136,11 -999,10 +1132,11 @@@
                timeFilter,
                valueFilter,
                context,
 -              dataGroupMember.getHeader(),
 -              ascending);
 -      partialPathBatchReaderMap.put(
 -          PartialPath.getExactFullPath(partialPath), new 
SeriesRawDataBatchReader(seriesReader));
 +              dataGroupMember,
 +              ascending,
 +              requiredSlots,
 +              false);
-       partialPathBatchReaderMap.put(partialPath.getFullPath(), batchReader);
++      
partialPathBatchReaderMap.put(PartialPath.getExactFullPath(partialPath), 
batchReader);
      }
      return new MultBatchReader(partialPathBatchReaderMap);
    }
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImpl.java
index 0000000,82cb1b0..6891a6e
mode 000000,100644..100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImpl.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImpl.java
@@@ -1,0 -1,71 +1,71 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ package org.apache.iotdb.cluster.server.clusterinfo;
+ 
+ import org.apache.iotdb.cluster.partition.PartitionGroup;
+ import org.apache.iotdb.cluster.rpc.thrift.ClusterInfoService;
+ import org.apache.iotdb.cluster.rpc.thrift.DataPartitionEntry;
+ import org.apache.iotdb.cluster.rpc.thrift.Node;
+ import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
+ 
+ import org.apache.commons.collections4.map.MultiKeyMap;
+ import org.apache.thrift.TException;
+ 
+ import java.util.ArrayList;
+ import java.util.List;
+ import java.util.Map;
+ 
+ public class ClusterInfoServiceImpl implements ClusterInfoService.Iface {
+ 
+   @Override
+   public List<Node> getRing() throws TException {
+     return ClusterMonitor.INSTANCE.getRing();
+   }
+ 
+   @Override
+   public List<DataPartitionEntry> getDataPartition(String path, long 
startTime, long endTime) {
+     MultiKeyMap<Long, PartitionGroup> partitions =
+         ClusterMonitor.INSTANCE.getDataPartition(path, startTime, endTime);
+     List<DataPartitionEntry> result = new ArrayList<>(partitions.size());
+     partitions.forEach(
+         (multikey, nodes) ->
+             result.add(new DataPartitionEntry(multikey.getKey(0), 
multikey.getKey(1), nodes)));
+     return result;
+   }
+ 
+   @Override
+   public List<Node> getMetaPartition(String path) throws TException {
+     return ClusterMonitor.INSTANCE.getMetaPartition(path);
+   }
+ 
+   @Override
 -  public Map<Node, Boolean> getAllNodeStatus() throws TException {
++  public Map<Node, Integer> getAllNodeStatus() throws TException {
+     return ClusterMonitor.INSTANCE.getAllNodeStatus();
+   }
+ 
+   @Override
+   public String getInstrumentingInfo() throws TException {
+     return ClusterMonitor.INSTANCE.getInstrumentingInfo();
+   }
+ 
+   public void handleClientExit() {
+     // do something when a client connection exits.
+   }
+ }
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index 6ab551f,42d28a4..6d0b75e
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@@ -75,59 -66,9 +77,67 @@@ public class ClusterMonitor implements 
    }
  
    @Override
 +  public List<Pair<Node, NodeCharacter>> getMetaGroup() {
 +    MetaGroupMember metaMember = getMetaGroupMember();
 +    if (metaMember.getPartitionTable() == null) {
 +      return null;
 +    }
 +    List<Pair<Node, NodeCharacter>> res = new ArrayList<>();
 +    for (Node node : metaMember.getPartitionTable().getAllNodes()) {
 +      if (node.equals(metaMember.getLeader())) {
 +        res.add(new Pair<>(node, NodeCharacter.LEADER));
 +      } else {
 +        res.add(new Pair<>(node, NodeCharacter.FOLLOWER));
 +      }
 +    }
 +    return res;
 +  }
 +
+   public List<Node> getRing() {
 -    PartitionTable partitionTable = getPartitionTable();
 -    return partitionTable != null ? partitionTable.getAllNodes() : null;
++    MetaGroupMember metaMember = getMetaGroupMember();
++    if (metaMember.getPartitionTable() == null) {
++      return null;
++    }
++    return metaMember.getPartitionTable().getAllNodes();
++  }
++
 +  @Override
 +  public List<Pair<Node, NodeCharacter>> getDataGroup(int raftId) throws 
Exception {
 +    MetaGroupMember metaMember = getMetaGroupMember();
 +    if (metaMember.getPartitionTable() == null) {
 +      return null;
 +    }
 +    RaftNode raftNode = new RaftNode(metaMember.getThisNode(), raftId);
 +    DataGroupMember dataMember =
 +        
metaMember.getDataClusterServer().getHeaderGroupMap().getOrDefault(raftNode, 
null);
 +    if (dataMember == null) {
 +      throw new Exception(String.format("Partition whose header is %s doesn't 
exist.", raftNode));
 +    }
 +    List<Pair<Node, NodeCharacter>> res = new ArrayList<>();
 +    for (Node node : dataMember.getAllNodes()) {
 +      if (node.equals(metaMember.getThisNode())) {
 +        res.add(new Pair<>(node, NodeCharacter.LEADER));
 +      } else {
 +        res.add(new Pair<>(node, NodeCharacter.FOLLOWER));
 +      }
 +    }
 +    return res;
 +  }
 +
 +  @Override
 +  public Map<PartitionGroup, Integer> getSlotNumInDataMigration() throws 
Exception {
 +    MetaGroupMember member = getMetaGroupMember();
 +    if (member.getPartitionTable() == null) {
 +      throw new Exception(BUILDING_CLUSTER_INFO);
 +    }
 +    if (member.getCharacter() != NodeCharacter.LEADER) {
 +      if (member.getLeader() == null || 
member.getLeader().equals(ClusterConstant.EMPTY_NODE)) {
 +        throw new Exception(META_LEADER_UNKNOWN_INFO);
 +      } else {
 +        throw new 
Exception(NodeToolCmd.redirectToQueryMetaLeader(member.getLeader()));
 +      }
 +    }
 +    return member.collectAllPartitionMigrationStatus();
    }
  
    @Override
diff --cc 
cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 07fadcd,475bef2..39451dd
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@@ -126,19 -118,22 +126,20 @@@ import java.nio.ByteBuffer
  import java.util.ArrayList;
  import java.util.Collections;
  import java.util.HashMap;
+ import java.util.HashSet;
  import java.util.List;
  import java.util.Map;
+ import java.util.Set;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Executors;
 -import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicReference;
  
  import static org.apache.iotdb.cluster.server.NodeCharacter.ELECTOR;
  import static org.apache.iotdb.cluster.server.NodeCharacter.FOLLOWER;
  import static org.apache.iotdb.cluster.server.NodeCharacter.LEADER;
 -import static org.awaitility.Awaitility.await;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertFalse;
- import static org.junit.Assert.assertNotEquals;
  import static org.junit.Assert.assertNotNull;
  import static org.junit.Assert.assertNull;
  import static org.junit.Assert.assertTrue;
@@@ -925,11 -893,10 +928,11 @@@ public class MetaGroupMemberTest extend
          IReaderByTimestamp readerByTimestamp =
              readerFactory.getReaderByTimestamp(
                  new PartialPath(TestUtils.getTestSeries(i, 0)),
-                 Collections.singleton(TestUtils.getTestMeasurement(0)),
+                 deviceMeasurements,
                  TSDataType.DOUBLE,
                  context,
 -                true);
 +                true,
 +                null);
  
          Object[] values = readerByTimestamp.getValuesInTimestamps(times, 10);
          for (int j = 0; j < 10; j++) {
diff --cc thrift-cluster/src/main/thrift/cluster.thrift
index fa7ceb9,2bcf9bf..66f5e5d
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@@ -535,5 -508,53 +535,53 @@@ service TSMetaService extends RaftServi
    * cannot know when another node resumes, and handshakes are mainly used to 
update node status
    * on coordinator side.
    **/
-   void handshake(Node sender)
+   void handshake(1:Node sender);
  }
+ 
+ 
+ struct DataPartitionEntry{
+   1: required long startTime,
+   2: required long endTime,
+   3: required list<Node> nodes
+ }
+ 
+ /**
+ * for cluster maintainer.
+ * The interface will replace the JMX based NodeTool APIs.
+ **/
+ service ClusterInfoService {
+ 
+     /**
+      * Get physical hash ring
+      */
+     list<Node> getRing();
+ 
+     /**
+      * Get data partition information of input path and time range.
+      * @param path input path
+      * @return data partition information
+      */
+     list<DataPartitionEntry> getDataPartition(1:string path, 2:long 
startTime, 3:long endTime);
+ 
+     /**
+      * Get metadata partition information of input path
+      *
+      * @param path input path
+      * @return metadata partition information
+      */
+     list<Node> getMetaPartition(1:string path);
+ 
+     /**
+      * Get status of all nodes
+      *
 -     * @return key: node, value: live or not
++     * @return key: node, value: 0(live), 1(offline), 2(joining), 3(leaving)
+      */
 -    map<Node, bool> getAllNodeStatus();
++    map<Node, int> getAllNodeStatus();
+ 
+     /**
+      * @return A multi-line string with each line representing the total time 
consumption, invocation
+      *     number, and average time consumption.
+      */
+     string getInstrumentingInfo();
+ 
+ }

Reply via email to