This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch DoubleWrite in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f425e7d5c4645d9efa4f48b4191c3436a7adc74c Merge: b8650ca 99822c7 Author: JackieTien97 <[email protected]> AuthorDate: Tue May 18 14:38:09 2021 +0800 fix conflicts .github/workflows/client-go.yml | 3 + .github/workflows/client.yml | 5 +- .github/workflows/e2e.yml | 3 + .github/workflows/main-unix.yml | 3 + .github/workflows/main-win.yml | 3 + .github/workflows/sonar-coveralls.yml | 3 + .../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 | 6 +- cli/pom.xml | 2 +- .../org/apache/iotdb/tool/AbstractCsvTool.java | 40 +-- .../main/java/org/apache/iotdb/tool/ExportCsv.java | 2 +- .../main/java/org/apache/iotdb/tool/ImportCsv.java | 56 ++-- client-cpp/src/main/Session.cpp | 11 +- client-cpp/src/main/Session.h | 16 +- cluster/pom.xml | 12 +- .../resources/conf/iotdb-cluster.properties | 60 ++-- .../apache/iotdb/cluster/config/ClusterConfig.java | 20 +- .../iotdb/cluster/config/ClusterDescriptor.java | 5 - .../iotdb/cluster/log/applier/BaseApplier.java | 36 ++- .../iotdb/cluster/log/catchup/CatchUpTask.java | 48 +-- .../iotdb/cluster/log/catchup/LogCatchUpTask.java | 4 +- .../cluster/log/manage/CommittedEntryManager.java | 12 +- .../serializable/SyncLogDequeSerializer.java | 4 +- .../query/last/ClusterLastQueryExecutor.java | 1 + .../iotdb/cluster/server/DataClusterServer.java | 6 +- .../iotdb/cluster/server/MetaClusterServer.java | 6 +- .../iotdb/cluster/server/member/RaftMember.java | 77 +++-- .../cluster/server/service/BaseAsyncService.java | 19 +- .../cluster/server/service/BaseSyncService.java | 23 +- .../cluster/client/async/AsyncClientPoolTest.java | 37 ++- .../cluster/client/async/AsyncDataClientTest.java | 18 ++ .../client/async/AsyncDataHeartbeatClientTest.java | 18 ++ .../cluster/client/async/AsyncMetaClientTest.java | 18 ++ .../client/async/AsyncMetaHeartbeatClientTest.java | 18 ++ .../iotdb/cluster/log/LogDispatcherTest.java | 10 +- .../cluster/log/applier/DataLogApplierTest.java | 35 +++ .../cluster/log/snapshot/DataSnapshotTest.java | 8 + .../cluster/partition/SlotPartitionTableTest.java | 2 +- .../query/ClusterDataQueryExecutorTest.java | 3 - .../cluster/server/member/DataGroupMemberTest.java | 5 +- .../cluster/server/member/RaftMemberTest.java | 9 +- code-coverage/pom.xml | 2 +- compile-tools/pom.xml | 8 +- .../Administration-Management/Administration.md | 7 +- docs/UserGuide/Cluster/Cluster-Setup-Example.md | 47 +++ docs/UserGuide/Cluster/Cluster-Setup.md | 11 +- .../Data-Concept/Data-Model-and-Terminology.md | 4 +- .../DDL-Data-Definition-Language.md | 2 +- .../Administration-Management/Administration.md | 3 +- docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md | 46 +++ docs/zh/UserGuide/Cluster/Cluster-Setup.md | 11 +- .../Data-Concept/Data-Model-and-Terminology.md | 4 +- .../DDL-Data-Definition-Language.md | 2 +- example/hadoop/pom.xml | 3 +- example/udf/pom.xml | 2 +- hadoop/pom.xml | 2 +- hive-connector/pom.xml | 5 +- .../iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java | 9 +- .../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 14 +- pom.xml | 3 +- .../resources/conf/iotdb-engine.properties | 279 ++++++++-------- .../org/apache/iotdb/db/auth/AuthorityChecker.java | 26 +- .../apache/iotdb/db/auth/entity/PrivilegeType.java | 1 + .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 39 ++- .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java | 28 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 27 +- .../iotdb/db/doublewrite/DoubleWriteConsumer.java | 14 +- .../iotdb/db/doublewrite/DoubleWriteProducer.java | 2 +- .../iotdb/db/doublewrite/DoubleWriteType.java | 18 ++ .../org/apache/iotdb/db/engine/StorageEngine.java | 46 ++- .../db/engine/cache/TimeSeriesMetadataCache.java | 2 +- .../compaction/CompactionMergeTaskPoolManager.java | 38 ++- .../db/engine/compaction/TsFileManagement.java | 58 +++- .../level/LevelCompactionTsFileManagement.java | 349 ++++++++++++--------- .../no/NoCompactionTsFileManagement.java | 132 ++++---- .../engine/compaction/utils/CompactionUtils.java | 25 +- .../iotdb/db/engine/memtable/AbstractMemTable.java | 11 + .../apache/iotdb/db/engine/memtable/IMemTable.java | 4 + .../db/engine/merge/manage/MergeResource.java | 2 +- .../iotdb/db/engine/merge/task/MergeFileTask.java | 14 +- .../db/engine/storagegroup/StorageGroupInfo.java | 6 +- .../engine/storagegroup/StorageGroupProcessor.java | 118 ++++--- .../db/engine/storagegroup/TsFileProcessor.java | 50 +-- .../engine/storagegroup/TsFileProcessorInfo.java | 6 +- .../db/engine/storagegroup/TsFileResource.java | 11 +- .../storagegroup/timeindex/DeviceTimeIndex.java | 18 -- .../storagegroup/timeindex/FileTimeIndex.java | 5 - .../engine/storagegroup/timeindex/ITimeIndex.java | 8 - .../iotdb/db/engine/upgrade/UpgradeTask.java | 40 +++ .../org/apache/iotdb/db/metadata/MManager.java | 6 +- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 15 +- .../org/apache/iotdb/db/qp/logical/Operator.java | 10 - .../physical/BatchPlan.java} | 46 ++- .../db/qp/physical/crud/InsertMultiTabletPlan.java | 48 ++- .../physical/crud/InsertRowsOfOneDevicePlan.java | 42 ++- .../iotdb/db/qp/physical/crud/InsertRowsPlan.java | 49 ++- .../db/qp/physical/crud/InsertTabletPlan.java | 24 +- .../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 39 ++- .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 71 ++++- .../qp/strategy/optimizer/ConcatPathOptimizer.java | 9 + .../iotdb/db/query/executor/LastQueryExecutor.java | 41 ++- .../iotdb/db/query/executor/QueryRouter.java | 6 +- .../row/ElasticSerializableRowRecordList.java | 13 +- .../org/apache/iotdb/db/rescon/SystemInfo.java | 230 +++++++------- .../org/apache/iotdb/db/service/TSServiceImpl.java | 18 +- .../apache/iotdb/db/tools/TsFileRewriteTool.java | 15 +- .../db/tools/upgrade/TsFileOnlineUpgradeTool.java | 16 + .../java/org/apache/iotdb/db/utils/AuthUtils.java | 2 - .../java/org/apache/iotdb/db/utils/MergeUtils.java | 4 +- .../org/apache/iotdb/db/utils/UpgradeUtils.java | 10 - .../apache/iotdb/db/auth/AuthorityCheckerTest.java | 23 +- .../auth/authorizer/LocalFileAuthorizerTest.java | 6 +- .../db/engine/compaction/CompactionChunkTest.java | 8 +- .../compaction/LevelCompactionCacheTest.java | 5 +- .../engine/compaction/LevelCompactionLogTest.java | 5 +- .../compaction/LevelCompactionMergeTest.java | 78 ++++- .../engine/compaction/LevelCompactionModsTest.java | 45 ++- .../compaction/LevelCompactionMoreDataTest.java | 5 +- .../LevelCompactionTsFileManagementTest.java | 69 ++++ .../NoCompactionTsFileManagementTest.java | 75 ++++- .../iotdb/db/engine/merge/MergeTaskTest.java | 75 +++++ .../engine/storagegroup/TsFileProcessorTest.java | 8 +- .../iotdb/db/integration/IOTDBGroupByIT.java | 19 ++ .../iotdb/db/integration/IoTDBGroupByFillIT.java | 22 ++ .../iotdb/db/integration/IoTDBGroupByMonthIT.java | 98 +++++- ...oTDBLoadExternalTsFileWithTimePartitionIT.java} | 44 ++- .../iotdb/db/integration/IoTDBSimpleQueryIT.java | 36 +++ .../integration/IoTDBUDTFAlignByTimeQueryIT.java | 16 + .../aggregation/IoTDBAggregationByLevelIT.java | 19 ++ .../db/integration/auth/IoTDBAuthorizationIT.java | 72 +++++ .../iotdb/db/metadata/MManagerBasicTest.java | 71 +++++ .../org/apache/iotdb/db/script/EnvScriptIT.java | 6 +- .../iotdb/db/utils/TsFileRewriteToolTest.java | 57 ++++ .../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java | 20 +- .../org/apache/iotdb/rpc/RpcTransportFactory.java | 10 +- .../main/java/org/apache/iotdb/rpc/RpcUtils.java | 4 +- .../rpc/TimeoutChangeableTFastFramedTransport.java | 8 +- .../TimeoutChangeableTSnappyFramedTransport.java | 20 +- .../java/org/apache/iotdb/session/SessionUT.java | 2 +- site/src/main/.vuepress/config.js | 12 +- spark-tsfile/pom.xml | 2 +- thrift-cluster/src/main/thrift/cluster.thrift | 8 +- .../iotdb/tsfile/read/TsFileSequenceReader.java | 21 +- .../tsfile/read/filter/GroupByMonthFilter.java | 17 +- .../tsfile/v2/file/metadata/TsFileMetadataV2.java | 9 +- .../tsfile/v2/read/TsFileSequenceReaderForV2.java | 16 +- 145 files changed, 2795 insertions(+), 1159 deletions(-) diff --cc server/src/assembly/resources/conf/iotdb-engine.properties index 4ae5d52,123db71..5d15a9b --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@@ -643,29 -654,21 +654,31 @@@ timestamp_precision=m # index_root_dir=data/index # Is index enable - enable_index=false + # enable_index=false # How many threads can concurrently build index. When <= 0, use CPU core number. - concurrent_index_build_thread=0 + # concurrent_index_build_thread=0 # the default size of sliding window used for the subsequence matching in index framework - default_index_window_range=10 + # default_index_window_range=10 # buffer parameter for index processor. - index_buffer_size=134217728 + # index_buffer_size=134217728 # whether enable data partition. If disabled, all data belongs to partition 0 - enable_partition=false + # enable_partition=false # time range for partitioning data inside each storage group, the unit is second - partition_interval=604800 + # partition_interval=604800 + + # concurrent_writing_time_partition=1 + +#################### +### Double Write Configuration +#################### + +# enable_double_write=false +# secondary_address=127.0.0.1 +# secondary_port=6668 +# secondary_user=root - # secondary_password=root ++# secondary_password=root diff --cc server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java index b5b3ace,0000000..638cb58 mode 100644,000000..100644 --- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java +++ b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java @@@ -1,181 -1,0 +1,173 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.doublewrite; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.rpc.*; +import org.apache.iotdb.service.rpc.thrift.*; +import org.apache.iotdb.tsfile.utils.Pair; + +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.ZoneId; +import java.util.concurrent.BlockingQueue; + +public class DoubleWriteConsumer implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(DoubleWriteConsumer.class); - private BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue; ++ private final BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue; + private TSIService.Iface doubleWriteClient; + private TTransport transport; + private long sessionId; - private long consumerCnt = 0; - private long consumerTime = 0; + + public DoubleWriteConsumer( + BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue) { + this.doubleWriteQueue = doubleWriteQueue; + init(); + } + + @Override + public void run() { + try { + while (true) { - long startTime = System.nanoTime(); + Pair<DoubleWriteType, TSInsertRecordsReq> head = doubleWriteQueue.take(); + if (head.left == DoubleWriteType.DOUBLE_WRITE_END) { + break; + } + switch (head.left) { + case TSInsertRecordsReq: + TSInsertRecordsReq tsInsertRecordsReq = head.right; + try { + RpcUtils.verifySuccessWithRedirection( + doubleWriteClient.insertRecords(tsInsertRecordsReq)); + } catch (TException e) { + if (reconnect()) { + try { + RpcUtils.verifySuccess(doubleWriteClient.insertRecords(tsInsertRecordsReq)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException( + "Fail to reconnect to server. Please check server status"); + } + } + break; ++ default: ++ throw new UnsupportedOperationException(String.valueOf(head.left)); + } - consumerCnt += 1; - long endTime = System.nanoTime(); - consumerTime += endTime - startTime; + } + + TSCloseSessionReq req = new TSCloseSessionReq(sessionId); + try { + doubleWriteClient.closeSession(req); + } catch (TException e) { + throw new IoTDBConnectionException( + "Error occurs when closing session at server. Maybe server is down.", e); + } finally { + if (transport != null) { + transport.close(); + } + } + } catch (RedirectException + | StatementExecutionException + | InterruptedException + | IoTDBConnectionException e) { + e.printStackTrace(); + } + } + - public double getEfficiency() { - return (double) consumerCnt / (double) consumerTime * 1000000000.0; - } - + private boolean reconnect() { + boolean flag = false; + for (int i = 1; i <= 3; i++) { + try { + if (transport != null) { + close(); + init(); + flag = true; + } + } catch (Exception e) { + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + LOGGER.error("reconnect is interrupted.", e1); + Thread.currentThread().interrupt(); + } + } + } + return flag; + } + + private void close() throws IoTDBConnectionException { + TSCloseSessionReq req = new TSCloseSessionReq(sessionId); + try { + doubleWriteClient.closeSession(req); + } catch (TException e) { + throw new IoTDBConnectionException( + "Error occurs when closing session at server. Maybe server is down.", e); + } finally { + if (transport != null) { + transport.close(); + } + } + } + + private void init() { + try { + IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + RpcTransportFactory.setDefaultBufferCapacity(config.getThriftDefaultBufferSize()); + EndPoint endPoint = new EndPoint(config.getSecondaryAddress(), config.getSecondaryPort()); + RpcTransportFactory.setThriftMaxFrameSize(config.getThriftMaxFrameSize()); + + transport = + RpcTransportFactory.INSTANCE.getTransport( + new TSocket(endPoint.getIp(), endPoint.getPort(), 0)); + try { + transport.open(); + } catch (TTransportException e) { + throw new IoTDBConnectionException(e); + } + + doubleWriteClient = new TSIService.Client(new TBinaryProtocol(transport)); + doubleWriteClient = RpcUtils.newSynchronizedClient(doubleWriteClient); + + TSOpenSessionReq openReq = new TSOpenSessionReq(); + openReq.setUsername(config.getSecondaryUser()); + openReq.setPassword(config.getSecondaryPassword()); + openReq.setZoneId(ZoneId.systemDefault().toString()); + + try { + TSOpenSessionResp openResp = doubleWriteClient.openSession(openReq); + RpcUtils.verifySuccess(openResp.getStatus()); + sessionId = openResp.getSessionId(); + } catch (Exception e) { + transport.close(); + throw new IoTDBConnectionException(e); + } + } catch (IoTDBConnectionException e) { + e.printStackTrace(); + } + } +} diff --cc server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java index 6c44398,0000000..0c4e31a mode 100644,000000..100644 --- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java +++ b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java @@@ -1,41 -1,0 +1,41 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.doublewrite; + +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; +import org.apache.iotdb.tsfile.utils.Pair; + +import java.util.concurrent.BlockingQueue; + +public class DoubleWriteProducer { - private BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue; ++ private final BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue; + + public DoubleWriteProducer( + BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue) { + this.doubleWriteQueue = doubleWriteQueue; + } + + public void put(Pair<DoubleWriteType, TSInsertRecordsReq> reqPair) { + try { + doubleWriteQueue.put(reqPair); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --cc server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteType.java index e6dc12e,0000000..838fadc mode 100644,000000..100644 --- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteType.java +++ b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteType.java @@@ -1,9 -1,0 +1,27 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ +package org.apache.iotdb.db.doublewrite; + +public enum DoubleWriteType { + TSInsertRecordReq, + TSInsertRecordsReq, + TSInsertRecordsOfOneDeviceReq, + TSInsertStringRecordsReq, + DOUBLE_WRITE_END +} diff --cc server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 5a3a162,8b6f2ec..ae267c2 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@@ -190,8 -193,6 +190,7 @@@ public class TSServiceImpl implements T // Record the username for every rpc connection (session). private final Map<Long, String> sessionIdUsernameMap = new ConcurrentHashMap<>(); private final Map<Long, ZoneId> sessionIdZoneIdMap = new ConcurrentHashMap<>(); + private final Map<Long, DoubleWriteProducer> sessionIdProducerMap = new ConcurrentHashMap<>(); - private final Map<Long, DoubleWriteConsumer> sessionIdConsumerMap = new ConcurrentHashMap<>(); // The sessionId is unique in one IoTDB instance. private final AtomicLong sessionIdGenerator = new AtomicLong(); @@@ -281,17 -282,6 +280,16 @@@ IoTDBConstant.GLOBAL_DB_NAME, tsStatus.message, req.getUsername()); + + // if open double write + if (IoTDBDescriptor.getInstance().getConfig().isEnableDoubleWrite()) { + BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue = + new LinkedBlockingQueue<>(); + DoubleWriteProducer doubleWriteProducer = new DoubleWriteProducer(doubleWriteQueue); + DoubleWriteConsumer doubleWriteConsumer = new DoubleWriteConsumer(doubleWriteQueue); + new Thread(doubleWriteConsumer).start(); + sessionIdProducerMap.put(sessionId, doubleWriteProducer); - sessionIdConsumerMap.put(sessionId, doubleWriteConsumer); + } } else { tsStatus = RpcUtils.getStatus( @@@ -323,13 -313,6 +321,12 @@@ } } + // if open double write + if (IoTDBDescriptor.getInstance().getConfig().isEnableDoubleWrite()) { + sessionIdProducerMap.get(sessionId).put(new Pair<>(DoubleWriteType.DOUBLE_WRITE_END, null)); + sessionIdProducerMap.remove(sessionId); - sessionIdConsumerMap.remove(sessionId); + } + return new TSStatus( sessionIdUsernameMap.remove(sessionId) == null ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
