This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch DoubleWrite
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f425e7d5c4645d9efa4f48b4191c3436a7adc74c
Merge: b8650ca 99822c7
Author: JackieTien97 <[email protected]>
AuthorDate: Tue May 18 14:38:09 2021 +0800

    fix conflicts

 .github/workflows/client-go.yml                    |   3 +
 .github/workflows/client.yml                       |   5 +-
 .github/workflows/e2e.yml                          |   3 +
 .github/workflows/main-unix.yml                    |   3 +
 .github/workflows/main-win.yml                     |   3 +
 .github/workflows/sonar-coveralls.yml              |   3 +
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4   |   6 +-
 cli/pom.xml                                        |   2 +-
 .../org/apache/iotdb/tool/AbstractCsvTool.java     |  40 +--
 .../main/java/org/apache/iotdb/tool/ExportCsv.java |   2 +-
 .../main/java/org/apache/iotdb/tool/ImportCsv.java |  56 ++--
 client-cpp/src/main/Session.cpp                    |  11 +-
 client-cpp/src/main/Session.h                      |  16 +-
 cluster/pom.xml                                    |  12 +-
 .../resources/conf/iotdb-cluster.properties        |  60 ++--
 .../apache/iotdb/cluster/config/ClusterConfig.java |  20 +-
 .../iotdb/cluster/config/ClusterDescriptor.java    |   5 -
 .../iotdb/cluster/log/applier/BaseApplier.java     |  36 ++-
 .../iotdb/cluster/log/catchup/CatchUpTask.java     |  48 +--
 .../iotdb/cluster/log/catchup/LogCatchUpTask.java  |   4 +-
 .../cluster/log/manage/CommittedEntryManager.java  |  12 +-
 .../serializable/SyncLogDequeSerializer.java       |   4 +-
 .../query/last/ClusterLastQueryExecutor.java       |   1 +
 .../iotdb/cluster/server/DataClusterServer.java    |   6 +-
 .../iotdb/cluster/server/MetaClusterServer.java    |   6 +-
 .../iotdb/cluster/server/member/RaftMember.java    |  77 +++--
 .../cluster/server/service/BaseAsyncService.java   |  19 +-
 .../cluster/server/service/BaseSyncService.java    |  23 +-
 .../cluster/client/async/AsyncClientPoolTest.java  |  37 ++-
 .../cluster/client/async/AsyncDataClientTest.java  |  18 ++
 .../client/async/AsyncDataHeartbeatClientTest.java |  18 ++
 .../cluster/client/async/AsyncMetaClientTest.java  |  18 ++
 .../client/async/AsyncMetaHeartbeatClientTest.java |  18 ++
 .../iotdb/cluster/log/LogDispatcherTest.java       |  10 +-
 .../cluster/log/applier/DataLogApplierTest.java    |  35 +++
 .../cluster/log/snapshot/DataSnapshotTest.java     |   8 +
 .../cluster/partition/SlotPartitionTableTest.java  |   2 +-
 .../query/ClusterDataQueryExecutorTest.java        |   3 -
 .../cluster/server/member/DataGroupMemberTest.java |   5 +-
 .../cluster/server/member/RaftMemberTest.java      |   9 +-
 code-coverage/pom.xml                              |   2 +-
 compile-tools/pom.xml                              |   8 +-
 .../Administration-Management/Administration.md    |   7 +-
 docs/UserGuide/Cluster/Cluster-Setup-Example.md    |  47 +++
 docs/UserGuide/Cluster/Cluster-Setup.md            |  11 +-
 .../Data-Concept/Data-Model-and-Terminology.md     |   4 +-
 .../DDL-Data-Definition-Language.md                |   2 +-
 .../Administration-Management/Administration.md    |   3 +-
 docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md |  46 +++
 docs/zh/UserGuide/Cluster/Cluster-Setup.md         |  11 +-
 .../Data-Concept/Data-Model-and-Terminology.md     |   4 +-
 .../DDL-Data-Definition-Language.md                |   2 +-
 example/hadoop/pom.xml                             |   3 +-
 example/udf/pom.xml                                |   2 +-
 hadoop/pom.xml                                     |   2 +-
 hive-connector/pom.xml                             |   5 +-
 .../iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java     |   9 +-
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java |  14 +-
 pom.xml                                            |   3 +-
 .../resources/conf/iotdb-engine.properties         | 279 ++++++++--------
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |  26 +-
 .../apache/iotdb/db/auth/entity/PrivilegeType.java |   1 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  39 ++-
 .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java |  28 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  27 +-
 .../iotdb/db/doublewrite/DoubleWriteConsumer.java  |  14 +-
 .../iotdb/db/doublewrite/DoubleWriteProducer.java  |   2 +-
 .../iotdb/db/doublewrite/DoubleWriteType.java      |  18 ++
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  46 ++-
 .../db/engine/cache/TimeSeriesMetadataCache.java   |   2 +-
 .../compaction/CompactionMergeTaskPoolManager.java |  38 ++-
 .../db/engine/compaction/TsFileManagement.java     |  58 +++-
 .../level/LevelCompactionTsFileManagement.java     | 349 ++++++++++++---------
 .../no/NoCompactionTsFileManagement.java           | 132 ++++----
 .../engine/compaction/utils/CompactionUtils.java   |  25 +-
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  11 +
 .../apache/iotdb/db/engine/memtable/IMemTable.java |   4 +
 .../db/engine/merge/manage/MergeResource.java      |   2 +-
 .../iotdb/db/engine/merge/task/MergeFileTask.java  |  14 +-
 .../db/engine/storagegroup/StorageGroupInfo.java   |   6 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 118 ++++---
 .../db/engine/storagegroup/TsFileProcessor.java    |  50 +--
 .../engine/storagegroup/TsFileProcessorInfo.java   |   6 +-
 .../db/engine/storagegroup/TsFileResource.java     |  11 +-
 .../storagegroup/timeindex/DeviceTimeIndex.java    |  18 --
 .../storagegroup/timeindex/FileTimeIndex.java      |   5 -
 .../engine/storagegroup/timeindex/ITimeIndex.java  |   8 -
 .../iotdb/db/engine/upgrade/UpgradeTask.java       |  40 +++
 .../org/apache/iotdb/db/metadata/MManager.java     |   6 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  15 +-
 .../org/apache/iotdb/db/qp/logical/Operator.java   |  10 -
 .../physical/BatchPlan.java}                       |  46 ++-
 .../db/qp/physical/crud/InsertMultiTabletPlan.java |  48 ++-
 .../physical/crud/InsertRowsOfOneDevicePlan.java   |  42 ++-
 .../iotdb/db/qp/physical/crud/InsertRowsPlan.java  |  49 ++-
 .../db/qp/physical/crud/InsertTabletPlan.java      |  24 +-
 .../qp/physical/sys/CreateMultiTimeSeriesPlan.java |  39 ++-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  71 ++++-
 .../qp/strategy/optimizer/ConcatPathOptimizer.java |   9 +
 .../iotdb/db/query/executor/LastQueryExecutor.java |  41 ++-
 .../iotdb/db/query/executor/QueryRouter.java       |   6 +-
 .../row/ElasticSerializableRowRecordList.java      |  13 +-
 .../org/apache/iotdb/db/rescon/SystemInfo.java     | 230 +++++++-------
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  18 +-
 .../apache/iotdb/db/tools/TsFileRewriteTool.java   |  15 +-
 .../db/tools/upgrade/TsFileOnlineUpgradeTool.java  |  16 +
 .../java/org/apache/iotdb/db/utils/AuthUtils.java  |   2 -
 .../java/org/apache/iotdb/db/utils/MergeUtils.java |   4 +-
 .../org/apache/iotdb/db/utils/UpgradeUtils.java    |  10 -
 .../apache/iotdb/db/auth/AuthorityCheckerTest.java |  23 +-
 .../auth/authorizer/LocalFileAuthorizerTest.java   |   6 +-
 .../db/engine/compaction/CompactionChunkTest.java  |   8 +-
 .../compaction/LevelCompactionCacheTest.java       |   5 +-
 .../engine/compaction/LevelCompactionLogTest.java  |   5 +-
 .../compaction/LevelCompactionMergeTest.java       |  78 ++++-
 .../engine/compaction/LevelCompactionModsTest.java |  45 ++-
 .../compaction/LevelCompactionMoreDataTest.java    |   5 +-
 .../LevelCompactionTsFileManagementTest.java       |  69 ++++
 .../NoCompactionTsFileManagementTest.java          |  75 ++++-
 .../iotdb/db/engine/merge/MergeTaskTest.java       |  75 +++++
 .../engine/storagegroup/TsFileProcessorTest.java   |   8 +-
 .../iotdb/db/integration/IOTDBGroupByIT.java       |  19 ++
 .../iotdb/db/integration/IoTDBGroupByFillIT.java   |  22 ++
 .../iotdb/db/integration/IoTDBGroupByMonthIT.java  |  98 +++++-
 ...oTDBLoadExternalTsFileWithTimePartitionIT.java} |  44 ++-
 .../iotdb/db/integration/IoTDBSimpleQueryIT.java   |  36 +++
 .../integration/IoTDBUDTFAlignByTimeQueryIT.java   |  16 +
 .../aggregation/IoTDBAggregationByLevelIT.java     |  19 ++
 .../db/integration/auth/IoTDBAuthorizationIT.java  |  72 +++++
 .../iotdb/db/metadata/MManagerBasicTest.java       |  71 +++++
 .../org/apache/iotdb/db/script/EnvScriptIT.java    |   6 +-
 .../iotdb/db/utils/TsFileRewriteToolTest.java      |  57 ++++
 .../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java |  20 +-
 .../org/apache/iotdb/rpc/RpcTransportFactory.java  |  10 +-
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |   4 +-
 .../rpc/TimeoutChangeableTFastFramedTransport.java |   8 +-
 .../TimeoutChangeableTSnappyFramedTransport.java   |  20 +-
 .../java/org/apache/iotdb/session/SessionUT.java   |   2 +-
 site/src/main/.vuepress/config.js                  |  12 +-
 spark-tsfile/pom.xml                               |   2 +-
 thrift-cluster/src/main/thrift/cluster.thrift      |   8 +-
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |  21 +-
 .../tsfile/read/filter/GroupByMonthFilter.java     |  17 +-
 .../tsfile/v2/file/metadata/TsFileMetadataV2.java  |   9 +-
 .../tsfile/v2/read/TsFileSequenceReaderForV2.java  |  16 +-
 145 files changed, 2795 insertions(+), 1159 deletions(-)

diff --cc server/src/assembly/resources/conf/iotdb-engine.properties
index 4ae5d52,123db71..5d15a9b
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@@ -643,29 -654,21 +654,31 @@@ timestamp_precision=m
  # index_root_dir=data/index
  
  # Is index enable
- enable_index=false
+ # enable_index=false
  
  # How many threads can concurrently build index. When <= 0, use CPU core 
number.
- concurrent_index_build_thread=0
+ # concurrent_index_build_thread=0
  
  # the default size of sliding window used for the subsequence matching in 
index framework
- default_index_window_range=10
+ # default_index_window_range=10
  
  # buffer parameter for index processor.
- index_buffer_size=134217728
+ # index_buffer_size=134217728
  
  # whether enable data partition. If disabled, all data belongs to partition 0
- enable_partition=false
+ # enable_partition=false
  
  # time range for partitioning data inside each storage group, the unit is 
second
- partition_interval=604800
+ # partition_interval=604800
+ 
+ # concurrent_writing_time_partition=1
 +
 +####################
 +### Double Write Configuration
 +####################
 +
 +# enable_double_write=false
 +# secondary_address=127.0.0.1
 +# secondary_port=6668
 +# secondary_user=root
- # secondary_password=root
++# secondary_password=root
diff --cc 
server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
index b5b3ace,0000000..638cb58
mode 100644,000000..100644
--- 
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
@@@ -1,181 -1,0 +1,173 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.iotdb.db.doublewrite;
 +
 +import org.apache.iotdb.db.conf.IoTDBConfig;
 +import org.apache.iotdb.db.conf.IoTDBDescriptor;
 +import org.apache.iotdb.rpc.*;
 +import org.apache.iotdb.service.rpc.thrift.*;
 +import org.apache.iotdb.tsfile.utils.Pair;
 +
 +import org.apache.thrift.TException;
 +import org.apache.thrift.protocol.TBinaryProtocol;
 +import org.apache.thrift.transport.TSocket;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.time.ZoneId;
 +import java.util.concurrent.BlockingQueue;
 +
 +public class DoubleWriteConsumer implements Runnable {
 +  private static final Logger LOGGER = 
LoggerFactory.getLogger(DoubleWriteConsumer.class);
-   private BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> 
doubleWriteQueue;
++  private final BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> 
doubleWriteQueue;
 +  private TSIService.Iface doubleWriteClient;
 +  private TTransport transport;
 +  private long sessionId;
-   private long consumerCnt = 0;
-   private long consumerTime = 0;
 +
 +  public DoubleWriteConsumer(
 +      BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> 
doubleWriteQueue) {
 +    this.doubleWriteQueue = doubleWriteQueue;
 +    init();
 +  }
 +
 +  @Override
 +  public void run() {
 +    try {
 +      while (true) {
-         long startTime = System.nanoTime();
 +        Pair<DoubleWriteType, TSInsertRecordsReq> head = 
doubleWriteQueue.take();
 +        if (head.left == DoubleWriteType.DOUBLE_WRITE_END) {
 +          break;
 +        }
 +        switch (head.left) {
 +          case TSInsertRecordsReq:
 +            TSInsertRecordsReq tsInsertRecordsReq = head.right;
 +            try {
 +              RpcUtils.verifySuccessWithRedirection(
 +                  doubleWriteClient.insertRecords(tsInsertRecordsReq));
 +            } catch (TException e) {
 +              if (reconnect()) {
 +                try {
 +                  
RpcUtils.verifySuccess(doubleWriteClient.insertRecords(tsInsertRecordsReq));
 +                } catch (TException tException) {
 +                  throw new IoTDBConnectionException(tException);
 +                }
 +              } else {
 +                throw new IoTDBConnectionException(
 +                    "Fail to reconnect to server. Please check server 
status");
 +              }
 +            }
 +            break;
++          default:
++            throw new 
UnsupportedOperationException(String.valueOf(head.left));
 +        }
-         consumerCnt += 1;
-         long endTime = System.nanoTime();
-         consumerTime += endTime - startTime;
 +      }
 +
 +      TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
 +      try {
 +        doubleWriteClient.closeSession(req);
 +      } catch (TException e) {
 +        throw new IoTDBConnectionException(
 +            "Error occurs when closing session at server. Maybe server is 
down.", e);
 +      } finally {
 +        if (transport != null) {
 +          transport.close();
 +        }
 +      }
 +    } catch (RedirectException
 +        | StatementExecutionException
 +        | InterruptedException
 +        | IoTDBConnectionException e) {
 +      e.printStackTrace();
 +    }
 +  }
 +
-   public double getEfficiency() {
-     return (double) consumerCnt / (double) consumerTime * 1000000000.0;
-   }
- 
 +  private boolean reconnect() {
 +    boolean flag = false;
 +    for (int i = 1; i <= 3; i++) {
 +      try {
 +        if (transport != null) {
 +          close();
 +          init();
 +          flag = true;
 +        }
 +      } catch (Exception e) {
 +        try {
 +          Thread.sleep(1000);
 +        } catch (InterruptedException e1) {
 +          LOGGER.error("reconnect is interrupted.", e1);
 +          Thread.currentThread().interrupt();
 +        }
 +      }
 +    }
 +    return flag;
 +  }
 +
 +  private void close() throws IoTDBConnectionException {
 +    TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
 +    try {
 +      doubleWriteClient.closeSession(req);
 +    } catch (TException e) {
 +      throw new IoTDBConnectionException(
 +          "Error occurs when closing session at server. Maybe server is 
down.", e);
 +    } finally {
 +      if (transport != null) {
 +        transport.close();
 +      }
 +    }
 +  }
 +
 +  private void init() {
 +    try {
 +      IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 +      
RpcTransportFactory.setDefaultBufferCapacity(config.getThriftDefaultBufferSize());
 +      EndPoint endPoint = new EndPoint(config.getSecondaryAddress(), 
config.getSecondaryPort());
 +      
RpcTransportFactory.setThriftMaxFrameSize(config.getThriftMaxFrameSize());
 +
 +      transport =
 +          RpcTransportFactory.INSTANCE.getTransport(
 +              new TSocket(endPoint.getIp(), endPoint.getPort(), 0));
 +      try {
 +        transport.open();
 +      } catch (TTransportException e) {
 +        throw new IoTDBConnectionException(e);
 +      }
 +
 +      doubleWriteClient = new TSIService.Client(new 
TBinaryProtocol(transport));
 +      doubleWriteClient = RpcUtils.newSynchronizedClient(doubleWriteClient);
 +
 +      TSOpenSessionReq openReq = new TSOpenSessionReq();
 +      openReq.setUsername(config.getSecondaryUser());
 +      openReq.setPassword(config.getSecondaryPassword());
 +      openReq.setZoneId(ZoneId.systemDefault().toString());
 +
 +      try {
 +        TSOpenSessionResp openResp = doubleWriteClient.openSession(openReq);
 +        RpcUtils.verifySuccess(openResp.getStatus());
 +        sessionId = openResp.getSessionId();
 +      } catch (Exception e) {
 +        transport.close();
 +        throw new IoTDBConnectionException(e);
 +      }
 +    } catch (IoTDBConnectionException e) {
 +      e.printStackTrace();
 +    }
 +  }
 +}
diff --cc 
server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
index 6c44398,0000000..0c4e31a
mode 100644,000000..100644
--- 
a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
@@@ -1,41 -1,0 +1,41 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.iotdb.db.doublewrite;
 +
 +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
 +import org.apache.iotdb.tsfile.utils.Pair;
 +
 +import java.util.concurrent.BlockingQueue;
 +
 +public class DoubleWriteProducer {
-   private BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> 
doubleWriteQueue;
++  private final BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> 
doubleWriteQueue;
 +
 +  public DoubleWriteProducer(
 +      BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> 
doubleWriteQueue) {
 +    this.doubleWriteQueue = doubleWriteQueue;
 +  }
 +
 +  public void put(Pair<DoubleWriteType, TSInsertRecordsReq> reqPair) {
 +    try {
 +      doubleWriteQueue.put(reqPair);
 +    } catch (InterruptedException e) {
 +      e.printStackTrace();
 +    }
 +  }
 +}
diff --cc 
server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteType.java
index e6dc12e,0000000..838fadc
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteType.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteType.java
@@@ -1,9 -1,0 +1,27 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
 +package org.apache.iotdb.db.doublewrite;
 +
 +public enum DoubleWriteType {
 +  TSInsertRecordReq,
 +  TSInsertRecordsReq,
 +  TSInsertRecordsOfOneDeviceReq,
 +  TSInsertStringRecordsReq,
 +  DOUBLE_WRITE_END
 +}
diff --cc server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 5a3a162,8b6f2ec..ae267c2
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@@ -190,8 -193,6 +190,7 @@@ public class TSServiceImpl implements T
    // Record the username for every rpc connection (session).
    private final Map<Long, String> sessionIdUsernameMap = new 
ConcurrentHashMap<>();
    private final Map<Long, ZoneId> sessionIdZoneIdMap = new 
ConcurrentHashMap<>();
 +  private final Map<Long, DoubleWriteProducer> sessionIdProducerMap = new 
ConcurrentHashMap<>();
-   private final Map<Long, DoubleWriteConsumer> sessionIdConsumerMap = new 
ConcurrentHashMap<>();
  
    // The sessionId is unique in one IoTDB instance.
    private final AtomicLong sessionIdGenerator = new AtomicLong();
@@@ -281,17 -282,6 +280,16 @@@
            IoTDBConstant.GLOBAL_DB_NAME,
            tsStatus.message,
            req.getUsername());
 +
 +      // if open double write
 +      if (IoTDBDescriptor.getInstance().getConfig().isEnableDoubleWrite()) {
 +        BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> 
doubleWriteQueue =
 +            new LinkedBlockingQueue<>();
 +        DoubleWriteProducer doubleWriteProducer = new 
DoubleWriteProducer(doubleWriteQueue);
 +        DoubleWriteConsumer doubleWriteConsumer = new 
DoubleWriteConsumer(doubleWriteQueue);
 +        new Thread(doubleWriteConsumer).start();
 +        sessionIdProducerMap.put(sessionId, doubleWriteProducer);
-         sessionIdConsumerMap.put(sessionId, doubleWriteConsumer);
 +      }
      } else {
        tsStatus =
            RpcUtils.getStatus(
@@@ -323,13 -313,6 +321,12 @@@
        }
      }
  
 +    // if open double write
 +    if (IoTDBDescriptor.getInstance().getConfig().isEnableDoubleWrite()) {
 +      sessionIdProducerMap.get(sessionId).put(new 
Pair<>(DoubleWriteType.DOUBLE_WRITE_END, null));
 +      sessionIdProducerMap.remove(sessionId);
-       sessionIdConsumerMap.remove(sessionId);
 +    }
 +
      return new TSStatus(
          sessionIdUsernameMap.remove(sessionId) == null
              ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)

Reply via email to