This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch test-parallel-insert
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7ef8cf744f973f4ec101dddac28dad0084e57148
Author: Liu Xuxin <[email protected]>
AuthorDate: Sun Jan 28 15:52:31 2024 +0800

    temporay support parallel insert and pass session-example
---
 .../main/java/org/apache/iotdb/SessionExample.java |  6 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 19 ++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 10 ++++
 .../org/apache/iotdb/db/experiment/InsertTask.java | 68 ++++++++++++++++++++++
 .../apache/iotdb/db/experiment/InsertWorkers.java  | 51 ++++++++++++++++
 .../queryengine/plan/analyze/AnalyzeVisitor.java   | 27 ++++++++-
 .../db/storageengine/dataregion/DataRegion.java    | 64 +++++++++++++++++++-
 .../dataregion/memtable/AbstractMemTable.java      | 55 +++++++++--------
 .../dataregion/memtable/PrimitiveMemTable.java     |  4 +-
 .../dataregion/memtable/TsFileProcessor.java       | 18 +++---
 .../dataregion/memtable/TsFileProcessorInfo.java   | 16 ++---
 .../dataregion/memtable/WritableMemChunk.java      | 44 +++++++-------
 .../dataregion/memtable/WritableMemChunkGroup.java |  4 +-
 13 files changed, 316 insertions(+), 70 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index d5f40452ea8..088f3f1c261 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -90,11 +90,11 @@ public class SessionExample {
     //     createTemplate();
     createTimeseries();
     createMultiTimeseries();
-    insertRecord();
-    insertTablet();
+    //    insertRecord();
+    //    insertTablet();
     //    insertTabletWithNullValues();
     //    insertTablets();
-    //    insertRecords();
+    insertRecords();
     //    insertText();
     //    selectInto();
     //    createAndDropContinuousQueries();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 854401d7b5e..3b50f4503cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1061,6 +1061,9 @@ public class IoTDBConfig {
   // customizedProperties, this should be empty by default.
   private Properties customizedProperties = new Properties();
 
+  private boolean enableMultiThreadingInsert = true;
+  private int insertThreadNum = Runtime.getRuntime().availableProcessors();
+
   // IoTConsensus Config
   private int maxLogEntriesNumPerBatch = 1024;
   private int maxSizePerBatch = 16 * 1024 * 1024;
@@ -3801,4 +3804,20 @@ public class IoTDBConfig {
       double innerCompactionTaskSelectionDiskRedundancy) {
     this.innerCompactionTaskSelectionDiskRedundancy = 
innerCompactionTaskSelectionDiskRedundancy;
   }
+
+  public boolean isEnableMultiThreadingInsert() {
+    return enableMultiThreadingInsert;
+  }
+
+  public void setEnableMultiThreadingInsert(boolean 
enableMultiThreadingInsert) {
+    this.enableMultiThreadingInsert = enableMultiThreadingInsert;
+  }
+
+  public int getInsertThreadNum() {
+    return insertThreadNum;
+  }
+
+  public void setInsertThreadNum(int insertThreadNum) {
+    this.insertThreadNum = insertThreadNum;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index e4c5c243598..13a0b2c3475 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -663,6 +663,16 @@ public class IoTDBDescriptor {
                 "inner_compaction_task_selection_mods_file_threshold",
                 
Long.toString(conf.getInnerCompactionTaskSelectionModsFileThreshold()))));
 
+    conf.setEnableMultiThreadingInsert(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_multi_threading_insert",
+                String.valueOf(conf.isEnableMultiThreadingInsert()))));
+    conf.setInsertThreadNum(
+        Integer.parseInt(
+            properties.getProperty(
+                "insert_thread_num", 
Integer.toString(conf.getInsertThreadNum()))));
+
     conf.setEnablePartialInsert(
         Boolean.parseBoolean(
             properties.getProperty(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertTask.java
new file mode 100644
index 00000000000..4e312b538ec
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertTask.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.experiment;
+
+import org.apache.iotdb.db.exception.WriteProcessException;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class InsertTask implements Runnable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InsertTask.class);
+  private TsFileProcessor processor;
+  private InsertRowNode insertRowNode;
+  private CountDownLatch countDownLatch;
+  private AtomicLong[] costForMetrics;
+  private List<InsertRowNode> executedList;
+
+  public InsertTask(
+      TsFileProcessor processor,
+      InsertRowNode insertRowNode,
+      AtomicLong[] costForMetrics,
+      CountDownLatch countDownLatch,
+      List<InsertRowNode> executedList) {
+    this.processor = processor;
+    this.insertRowNode = insertRowNode;
+    this.countDownLatch = countDownLatch;
+    this.costForMetrics = costForMetrics;
+    this.executedList = executedList;
+  }
+
+  @Override
+  public void run() {
+    try {
+      long[] tempCostForMetrics = new long[4];
+      processor.insert(insertRowNode, tempCostForMetrics);
+      for (int i = 0; i < 4; i++) {
+        costForMetrics[i].addAndGet(tempCostForMetrics[i]);
+      }
+      executedList.add(insertRowNode);
+    } catch (WriteProcessException e) {
+      LOGGER.error("Insertion failed", e);
+    } finally {
+      countDownLatch.countDown();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertWorkers.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertWorkers.java
new file mode 100644
index 00000000000..6e254d24e2a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertWorkers.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.experiment;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class InsertWorkers {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InsertWorkers.class);
+  private static ThreadPoolExecutor threadPool = null;
+
+  static {
+    if 
(IoTDBDescriptor.getInstance().getConfig().isEnableMultiThreadingInsert()) {
+      LOGGER.info("Initializing thread pool for parallel insert");
+      threadPool =
+          new ThreadPoolExecutor(
+              Runtime.getRuntime().availableProcessors(),
+              IoTDBDescriptor.getInstance().getConfig().getInsertThreadNum(),
+              10,
+              TimeUnit.SECONDS,
+              new LinkedBlockingQueue<>(8196),
+              new ThreadPoolExecutor.CallerRunsPolicy());
+    }
+  }
+
+  public static void submit(InsertTask task) {
+    threadPool.submit(task);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index c408b73498d..e3c847debec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -163,6 +163,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 
@@ -214,6 +215,11 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS 
=
       PerformanceOverviewMetrics.getInstance();
 
+  public static final AtomicLong partitionSettingTimeInTotal = new 
AtomicLong(0);
+  public static final AtomicLong partitionSettingTimeCount = new AtomicLong(0);
+  public static final AtomicLong partitionFetchingTimeInTotal = new 
AtomicLong(0);
+  public static final AtomicLong partitionFetchingTimeCount = new 
AtomicLong(0);
+
   public AnalyzeVisitor(IPartitionFetcher partitionFetcher, ISchemaFetcher 
schemaFetcher) {
     this.partitionFetcher = partitionFetcher;
     this.schemaFetcher = schemaFetcher;
@@ -2526,6 +2532,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
   private Analysis computeAnalysisForInsertRows(
       Analysis analysis, InsertRowsStatement insertRowsStatement, String 
userName) {
+    final long startTime = System.nanoTime();
     Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new 
HashMap<>();
     for (InsertRowStatement insertRowStatement : 
insertRowsStatement.getInsertRowStatementList()) {
       Set<TTimePartitionSlot> timePartitionSlotSet =
@@ -2541,7 +2548,15 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       dataPartitionQueryParam.setTimePartitionSlotList(new 
ArrayList<>(entry.getValue()));
       dataPartitionQueryParams.add(dataPartitionQueryParam);
     }
-
+    long timeCost = System.nanoTime() - startTime;
+    long time = partitionSettingTimeInTotal.addAndGet(timeCost);
+    long count = partitionSettingTimeCount.incrementAndGet();
+    if (count % 1000 == 0) {
+      logger.info(
+          String.format(
+              "The average time cost of setting partition is %.2f ms",
+              (double) time / count / 1000000));
+    }
     return getAnalysisForWriting(analysis, dataPartitionQueryParams, userName);
   }
 
@@ -2703,7 +2718,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
   /** get analysis according to statement and params */
   private Analysis getAnalysisForWriting(
       Analysis analysis, List<DataPartitionQueryParam> 
dataPartitionQueryParams, String userName) {
-
+    final long startTime = System.nanoTime();
     DataPartition dataPartition =
         partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams, 
userName);
     if (dataPartition.isEmpty()) {
@@ -2715,6 +2730,14 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
                   + "because enable_auto_create_schema is FALSE."));
     }
     analysis.setDataPartitionInfo(dataPartition);
+    long timeCost = System.nanoTime() - startTime;
+    long time = partitionFetchingTimeCount.addAndGet(timeCost);
+    long count = partitionFetchingTimeCount.incrementAndGet();
+    if (count % 1000 == 0) {
+      logger.info(
+          String.format(
+              "Average time cost of fetching partition is %.2f ms", (double) 
time / count / 1e6));
+    }
     return analysis;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index d5cf0821ba4..da1675af82e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -49,6 +49,8 @@ import 
org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
+import org.apache.iotdb.db.experiment.InsertTask;
+import org.apache.iotdb.db.experiment.InsertWorkers;
 import 
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
@@ -144,6 +146,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.Phaser;
@@ -3199,7 +3202,11 @@ public class DataRegion implements IDataRegionForQuery {
                     > lastFlushTimeMap.getFlushedTime(
                         timePartitionIds[i], 
insertRowNode.getDevicePath().getFullPath());
       }
-      insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds);
+      if 
(IoTDBDescriptor.getInstance().getConfig().isEnableMultiThreadingInsert()) {
+        insertToTsFileProcessorsParallel(insertRowsNode, areSequence, 
timePartitionIds);
+      } else {
+        insertToTsFileProcessors(insertRowsNode, areSequence, 
timePartitionIds);
+      }
       if (!insertRowsNode.getResults().isEmpty()) {
         throw new BatchProcessException("Partial failed inserting rows");
       }
@@ -3208,6 +3215,61 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
+  public void insertToTsFileProcessorsParallel(
+      InsertRowsNode insertRowsNode, boolean[] areSequence, long[] 
timePartitionIds) {
+    List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
+    AtomicLong[] costsForMetrics = new AtomicLong[4];
+    for (int i = 0; i < costsForMetrics.length; i++) {
+      costsForMetrics[i] = new AtomicLong(0);
+    }
+    Map<TsFileProcessor, Boolean> tsFileProcessorMapForFlushing = new 
HashMap<>();
+    CountDownLatch latch = new 
CountDownLatch(insertRowsNode.getInsertRowNodeList().size());
+    for (int i = 0; i < areSequence.length; i++) {
+      InsertRowNode insertRowNode = 
insertRowsNode.getInsertRowNodeList().get(i);
+      if (insertRowNode.allMeasurementFailed()) {
+        latch.countDown();
+        continue;
+      }
+      TsFileProcessor tsFileProcessor =
+          getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]);
+      if (tsFileProcessor == null) {
+        latch.countDown();
+        continue;
+      }
+      tsFileProcessorMapForFlushing.put(tsFileProcessor, areSequence[i]);
+      InsertWorkers.submit(
+          new InsertTask(
+              tsFileProcessor, insertRowNode, costsForMetrics, latch, 
executedInsertRowNodeList));
+    }
+    try {
+      latch.await();
+    } catch (InterruptedException e) {
+      logger.error("Insertion is interrupted", e);
+      return;
+    }
+    // check memtable size and may asyncTryToFlush the work memtable
+    for (Map.Entry<TsFileProcessor, Boolean> entry : 
tsFileProcessorMapForFlushing.entrySet()) {
+      if (entry.getKey().shouldFlush()) {
+        fileFlushPolicy.apply(this, entry.getKey(), entry.getValue());
+      }
+    }
+
+    
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0].get());
+    
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1].get());
+    
PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2].get());
+    
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3].get());
+    if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+      if 
((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
+          && insertRowsNode.isSyncFromLeaderWhenUsingIoTConsensus())) {
+        return;
+      }
+      // disable updating last cache on follower
+      long startTime = System.nanoTime();
+      tryToUpdateInsertRowsLastCache(executedInsertRowNodeList);
+      
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
 - startTime);
+    }
+  }
+
   /**
    * insert batch of tablets belongs to multiple devices
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index e54e7ef2c36..62cc8ada10f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -51,6 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -70,13 +71,15 @@ public abstract class AbstractMemTable implements IMemTable 
{
   private volatile FlushStatus flushStatus = FlushStatus.WORKING;
   private final int avgSeriesPointNumThreshold =
       
IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
+
   /** Memory size of data points, including TEXT values. */
-  private long memSize = 0;
+  private AtomicLong memSize = new AtomicLong(0);
+
   /**
    * Memory usage of all TVLists memory usage regardless of whether these 
TVLists are full,
    * including TEXT values.
    */
-  private long tvListRamCost = 0;
+  private AtomicLong tvListRamCost = new AtomicLong(0);
 
   private int seriesNumber = 0;
 
@@ -110,13 +113,13 @@ public abstract class AbstractMemTable implements 
IMemTable {
   protected AbstractMemTable() {
     this.database = null;
     this.dataRegionId = null;
-    this.memTableMap = new HashMap<>();
+    this.memTableMap = new ConcurrentHashMap<>();
   }
 
   protected AbstractMemTable(String database, String dataRegionId) {
     this.database = database;
     this.dataRegionId = dataRegionId;
-    this.memTableMap = new HashMap<>();
+    this.memTableMap = new ConcurrentHashMap<>();
   }
 
   protected AbstractMemTable(
@@ -142,13 +145,15 @@ public abstract class AbstractMemTable implements 
IMemTable {
       IDeviceID deviceId, List<IMeasurementSchema> schemaList) {
     IWritableMemChunkGroup memChunkGroup =
         memTableMap.computeIfAbsent(deviceId, k -> new 
WritableMemChunkGroup());
-    for (IMeasurementSchema schema : schemaList) {
-      if (schema != null && 
!memChunkGroup.contains(schema.getMeasurementId())) {
-        seriesNumber++;
-        totalPointsNumThreshold += avgSeriesPointNumThreshold;
+    synchronized (memChunkGroup) {
+      for (IMeasurementSchema schema : schemaList) {
+        if (schema != null && 
!memChunkGroup.contains(schema.getMeasurementId())) {
+          seriesNumber++;
+          totalPointsNumThreshold += avgSeriesPointNumThreshold;
+        }
       }
+      return memChunkGroup;
     }
-    return memChunkGroup;
   }
 
   private IWritableMemChunkGroup createAlignedMemChunkGroupIfNotExistAndGet(
@@ -199,7 +204,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
         dataTypes.add(schema.getType());
       }
     }
-    memSize += MemUtils.getRowRecordSize(dataTypes, values);
+    memSize.addAndGet(MemUtils.getRowRecordSize(dataTypes, values));
     write(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), 
values);
 
     int pointsInserted =
@@ -246,7 +251,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
     if (schemaList.isEmpty()) {
       return;
     }
-    memSize += MemUtils.getAlignedRowRecordSize(dataTypes, values);
+    memSize.addAndGet(MemUtils.getAlignedRowRecordSize(dataTypes, values));
     writeAlignedRow(insertRowNode.getDeviceID(), schemaList, 
insertRowNode.getTime(), values);
     int pointsInserted =
         insertRowNode.getMeasurements().length - 
insertRowNode.getFailedMeasurementNumber();
@@ -270,7 +275,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
       throws WriteProcessException {
     try {
       writeTabletNode(insertTabletNode, start, end);
-      memSize += MemUtils.getTabletSize(insertTabletNode, start, end);
+      memSize.addAndGet(MemUtils.getTabletSize(insertTabletNode, start, end));
       int pointsInserted =
           (insertTabletNode.getDataTypes().length - 
insertTabletNode.getFailedMeasurementNumber())
               * (end - start);
@@ -296,7 +301,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
       throws WriteProcessException {
     try {
       writeAlignedTablet(insertTabletNode, start, end);
-      memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end);
+      memSize.addAndGet(MemUtils.getAlignedTabletSize(insertTabletNode, start, 
end));
       int pointsInserted =
           (insertTabletNode.getDataTypes().length - 
insertTabletNode.getFailedMeasurementNumber())
               * (end - start);
@@ -436,7 +441,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
 
   @Override
   public long memSize() {
-    return memSize;
+    return memSize.get();
   }
 
   @Override
@@ -450,11 +455,11 @@ public abstract class AbstractMemTable implements 
IMemTable {
   @Override
   public void clear() {
     memTableMap.clear();
-    memSize = 0;
+    memSize = new AtomicLong(0);
     seriesNumber = 0;
     totalPointsNum = 0;
     totalPointsNumThreshold = 0;
-    tvListRamCost = 0;
+    tvListRamCost = new AtomicLong(0);
     maxPlanIndex = 0;
     minPlanIndex = 0;
   }
@@ -531,27 +536,27 @@ public abstract class AbstractMemTable implements 
IMemTable {
 
   @Override
   public void addTVListRamCost(long cost) {
-    this.tvListRamCost += cost;
+    this.tvListRamCost.addAndGet(cost);
   }
 
   @Override
   public void releaseTVListRamCost(long cost) {
-    this.tvListRamCost -= cost;
+    this.tvListRamCost.addAndGet(-cost);
   }
 
   @Override
   public long getTVListsRamCost() {
-    return tvListRamCost;
+    return tvListRamCost.get();
   }
 
   @Override
   public void addTextDataSize(long textDataSize) {
-    this.memSize += textDataSize;
+    this.memSize.addAndGet(textDataSize);
   }
 
   @Override
   public void releaseTextDataSize(long textDataSize) {
-    this.memSize -= textDataSize;
+    this.memSize.addAndGet(-textDataSize);
   }
 
   @Override
@@ -634,8 +639,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
       return;
     }
     buffer.putInt(seriesNumber);
-    buffer.putLong(memSize);
-    buffer.putLong(tvListRamCost);
+    buffer.putLong(memSize.get());
+    buffer.putLong(tvListRamCost.get());
     buffer.putLong(totalPointsNum);
     buffer.putLong(totalPointsNumThreshold);
     buffer.putLong(maxPlanIndex);
@@ -653,8 +658,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
 
   public void deserialize(DataInputStream stream) throws IOException {
     seriesNumber = stream.readInt();
-    memSize = stream.readLong();
-    tvListRamCost = stream.readLong();
+    memSize = new AtomicLong(stream.readLong());
+    tvListRamCost = new AtomicLong(stream.readLong());
     totalPointsNum = stream.readLong();
     totalPointsNumThreshold = stream.readLong();
     maxPlanIndex = stream.readLong();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
index 70b0f909381..b8ec268487b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class PrimitiveMemTable extends AbstractMemTable {
   // this constructor only used when deserialize
@@ -39,7 +39,7 @@ public class PrimitiveMemTable extends AbstractMemTable {
 
   @Override
   public IMemTable copy() {
-    Map<IDeviceID, IWritableMemChunkGroup> newMap = new 
HashMap<>(getMemTableMap());
+    Map<IDeviceID, IWritableMemChunkGroup> newMap = new 
ConcurrentHashMap<>(getMemTableMap());
 
     return new PrimitiveMemTable(getDatabase(), getDataRegionId(), newMap);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 953606ed136..6a20269e53a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -139,7 +139,7 @@ public class TsFileProcessor {
   private volatile boolean shouldClose;
 
   /** working memtable. */
-  private IMemTable workMemTable;
+  private volatile IMemTable workMemTable;
 
   /** last flush time to flush the working memtable. */
   private long lastWorkMemtableFlushTime;
@@ -239,12 +239,16 @@ public class TsFileProcessor {
       throws WriteProcessException {
 
     if (workMemTable == null) {
-      long startTime = System.nanoTime();
-      createNewWorkingMemTable();
-      // recordCreateMemtableBlockCost
-      costsForMetrics[0] += System.nanoTime() - startTime;
-      WritingMetrics.getInstance()
-          
.recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
+      synchronized (this) {
+        if (workMemTable == null) {
+          long startTime = System.nanoTime();
+          createNewWorkingMemTable();
+          // recordCreateMemtableBlockCost
+          costsForMetrics[0] += System.nanoTime() - startTime;
+          WritingMetrics.getInstance()
+              
.recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
+        }
+      }
     }
 
     long[] memIncrements = null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfo.java
index b06ed63cba0..9ebbdbed4fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfo.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 /** The TsFileProcessorInfo records the memory cost of this TsFileProcessor. */
 public class TsFileProcessorInfo {
 
@@ -28,13 +30,13 @@ public class TsFileProcessorInfo {
   private final DataRegionInfo dataRegionInfo;
 
   /** memory occupation of unsealed TsFileResource, ChunkMetadata, WAL */
-  private long memCost;
+  private AtomicLong memCost;
 
   private final TsFileProcessorInfoMetrics metrics;
 
   public TsFileProcessorInfo(DataRegionInfo dataRegionInfo) {
     this.dataRegionInfo = dataRegionInfo;
-    this.memCost = 0L;
+    this.memCost = new AtomicLong(0);
     this.metrics =
         new 
TsFileProcessorInfoMetrics(dataRegionInfo.getDataRegion().getDatabaseName(), 
this);
     MetricService.getInstance().addMetricSet(metrics);
@@ -42,25 +44,25 @@ public class TsFileProcessorInfo {
 
   /** called in each insert */
   public void addTSPMemCost(long cost) {
-    memCost += cost;
+    memCost.addAndGet(cost);
     dataRegionInfo.addStorageGroupMemCost(cost);
   }
 
   /** called when meet exception */
   public void releaseTSPMemCost(long cost) {
     dataRegionInfo.releaseStorageGroupMemCost(cost);
-    memCost -= cost;
+    memCost.addAndGet(-cost);
   }
 
   /** called when closing TSP */
   public void clear() {
-    dataRegionInfo.releaseStorageGroupMemCost(memCost);
-    memCost = 0L;
+    dataRegionInfo.releaseStorageGroupMemCost(memCost.get());
+    memCost.set(0);
     MetricService.getInstance().removeMetricSet(metrics);
   }
 
   /** get memCost */
   public long getMemCost() {
-    return memCost;
+    return memCost.get();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index c1a71d32a53..6248026e63a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -59,28 +59,30 @@ public class WritableMemChunk implements IWritableMemChunk {
 
   @Override
   public boolean writeWithFlushCheck(long insertTime, Object objectValue) {
-    switch (schema.getType()) {
-      case BOOLEAN:
-        putBoolean(insertTime, (boolean) objectValue);
-        break;
-      case INT32:
-        putInt(insertTime, (int) objectValue);
-        break;
-      case INT64:
-        putLong(insertTime, (long) objectValue);
-        break;
-      case FLOAT:
-        putFloat(insertTime, (float) objectValue);
-        break;
-      case DOUBLE:
-        putDouble(insertTime, (double) objectValue);
-        break;
-      case TEXT:
-        return putBinaryWithFlushCheck(insertTime, (Binary) objectValue);
-      default:
-        throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
+    synchronized (this) {
+      switch (schema.getType()) {
+        case BOOLEAN:
+          putBoolean(insertTime, (boolean) objectValue);
+          break;
+        case INT32:
+          putInt(insertTime, (int) objectValue);
+          break;
+        case INT64:
+          putLong(insertTime, (long) objectValue);
+          break;
+        case FLOAT:
+          putFloat(insertTime, (float) objectValue);
+          break;
+        case DOUBLE:
+          putDouble(insertTime, (double) objectValue);
+          break;
+        case TEXT:
+          return putBinaryWithFlushCheck(insertTime, (Binary) objectValue);
+        default:
+          throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
+      }
+      return false;
     }
-    return false;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
index 707eace7fa5..2245808bd94 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
@@ -29,18 +29,18 @@ import 
org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class WritableMemChunkGroup implements IWritableMemChunkGroup {
 
   private Map<String, IWritableMemChunk> memChunkMap;
 
   public WritableMemChunkGroup() {
-    memChunkMap = new HashMap<>();
+    memChunkMap = new ConcurrentHashMap<>();
   }
 
   @Override

Reply via email to