This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 44a6144c7ae [To dev/1.3] Load: Introduce thread pool for tablet 
conversion and insertion (#15661)
44a6144c7ae is described below

commit 44a6144c7ae47b21a40059562771bfc2381e7c00
Author: Zikun Ma <[email protected]>
AuthorDate: Fri Jun 6 16:58:27 2025 +0800

    [To dev/1.3] Load: Introduce thread pool for tablet conversion and 
insertion (#15661)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  10 ++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   5 +
 .../pipe/resource/memory/PipeMemoryWeightUtil.java |  12 +-
 ...eeStatementDataTypeConvertExecutionVisitor.java | 169 +++++++++++++++------
 .../converter/LoadTsFileDataTypeConverter.java     |  42 +++--
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 6 files changed, 173 insertions(+), 66 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 7f31bbefd48..38a4cb648aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1145,6 +1145,8 @@ public class IoTDBConfig {
 
   private long loadTsFileTabletConversionBatchMemorySizeInBytes = 4096 * 1024;
 
+  private int loadTsFileTabletConversionThreadCount = 5;
+
   private long loadChunkMetadataMemorySizeInBytes = 33554432; // 32MB
 
   private long loadMemoryAllocateRetryIntervalMs = 1000L;
@@ -4014,6 +4016,14 @@ public class IoTDBConfig {
         loadTsFileTabletConversionBatchMemorySizeInBytes;
   }
 
+  public int getLoadTsFileTabletConversionThreadCount() {
+    return loadTsFileTabletConversionThreadCount;
+  }
+
+  public void setLoadTsFileTabletConversionThreadCount(int 
loadTsFileTabletConversionThreadCount) {
+    this.loadTsFileTabletConversionThreadCount = 
loadTsFileTabletConversionThreadCount;
+  }
+
   public long getLoadChunkMetadataMemorySizeInBytes() {
     return loadChunkMetadataMemorySizeInBytes;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 94576d0aabf..50fccad85b8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2381,6 +2381,11 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "load_tsfile_tablet_conversion_batch_memory_size_in_bytes",
                 
String.valueOf(conf.getLoadTsFileTabletConversionBatchMemorySizeInBytes()))));
+    conf.setLoadTsFileTabletConversionThreadCount(
+        Integer.parseInt(
+            properties.getProperty(
+                "load_tsfile_tablet_conversion_thread_count",
+                
String.valueOf(conf.getLoadTsFileTabletConversionThreadCount()))));
     conf.setLoadChunkMetadataMemorySizeInBytes(
         Long.parseLong(
             Optional.ofNullable(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index d82c26144d6..cd9a0a79cae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -247,8 +247,8 @@ public class PipeMemoryWeightUtil {
     return totalSizeInBytes;
   }
 
-  public static int calculateBatchDataRamBytesUsed(BatchData batchData) {
-    int totalSizeInBytes = 0;
+  public static long calculateBatchDataRamBytesUsed(BatchData batchData) {
+    long totalSizeInBytes = 0;
 
     // timestamp
     totalSizeInBytes += 8;
@@ -263,16 +263,16 @@ public class PipeMemoryWeightUtil {
             continue;
           }
           // consider variable references (plus 8) and memory alignment (round 
up to 8)
-          totalSizeInBytes += roundUpToMultiple(primitiveType.getSize() + 8, 
8);
+          totalSizeInBytes += roundUpToMultiple(primitiveType.getSize() + 8L, 
8);
         }
       } else {
         if (type.isBinary()) {
           final Binary binary = batchData.getBinary();
           // refer to org.apache.tsfile.utils.TsPrimitiveType.TsBinary.getSize
           totalSizeInBytes +=
-              roundUpToMultiple((binary == null ? 8 : binary.getLength() + 8) 
+ 8, 8);
+              roundUpToMultiple((binary == null ? 8 : binary.ramBytesUsed()) + 
8L, 8);
         } else {
-          totalSizeInBytes += 
roundUpToMultiple(TsPrimitiveType.getByType(type).getSize() + 8, 8);
+          totalSizeInBytes += 
roundUpToMultiple(TsPrimitiveType.getByType(type).getSize() + 8L, 8);
         }
       }
     }
@@ -287,7 +287,7 @@ public class PipeMemoryWeightUtil {
    * @param n The specified multiple.
    * @return The nearest multiple of n greater than or equal to num.
    */
-  private static int roundUpToMultiple(int num, int n) {
+  private static long roundUpToMultiple(long num, int n) {
     if (n == 0) {
       throw new IllegalArgumentException("The multiple n must be greater than 
0");
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
index cb147f9c286..7dfe91ad62e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.storageengine.load.converter;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
@@ -45,6 +48,14 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil.calculateTabletSizeInBytes;
@@ -59,6 +70,9 @@ public class LoadTreeStatementDataTypeConvertExecutionVisitor
           .getConfig()
           .getLoadTsFileTabletConversionBatchMemorySizeInBytes();
 
+  private static final AtomicReference<WrappedThreadPoolExecutor> executorPool 
=
+      new AtomicReference<>();
+
   private final StatementExecutor statementExecutor;
 
   @FunctionalInterface
@@ -66,6 +80,21 @@ public class LoadTreeStatementDataTypeConvertExecutionVisitor
     TSStatus execute(final Statement statement);
   }
 
+  public static class CallerBlocksPolicy implements RejectedExecutionHandler {
+    public CallerBlocksPolicy() {}
+
+    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+      if (!e.isShutdown()) {
+        try {
+          e.getQueue().put(r);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          throw new RejectedExecutionException("task " + r + " rejected from " 
+ e, ie);
+        }
+      }
+    }
+  }
+
   public LoadTreeStatementDataTypeConvertExecutionVisitor(
       final StatementExecutor statementExecutor) {
     this.statementExecutor = statementExecutor;
@@ -89,6 +118,7 @@ public class LoadTreeStatementDataTypeConvertExecutionVisitor
     final List<Long> tabletRawReqSizes = new ArrayList<>();
 
     try {
+      final List<Future<TSStatus>> executionFutures = new ArrayList<>();
       for (final File file : loadTsFileStatement.getTsFiles()) {
         try (final TsFileInsertionScanDataContainer container =
             new TsFileInsertionScanDataContainer(
@@ -106,9 +136,16 @@ public class 
LoadTreeStatementDataTypeConvertExecutionVisitor
               continue;
             }
 
-            final TSStatus result =
-                executeInsertMultiTabletsWithRetry(
-                    tabletRawReqs, 
loadTsFileStatement.isConvertOnTypeMismatch());
+            final InsertMultiTabletsStatement batchStatement = new 
InsertMultiTabletsStatement();
+            batchStatement.setInsertTabletStatementList(
+                tabletRawReqs.stream()
+                    .map(
+                        req ->
+                            new LoadConvertedInsertTabletStatement(
+                                req.constructStatement(),
+                                loadTsFileStatement.isConvertOnTypeMismatch()))
+                    .collect(Collectors.toList()));
+            
executionFutures.add(executeInsertMultiTabletsWithRetry(batchStatement));
 
             for (final long memoryCost : tabletRawReqSizes) {
               block.reduceMemoryUsage(memoryCost);
@@ -116,10 +153,6 @@ public class 
LoadTreeStatementDataTypeConvertExecutionVisitor
             tabletRawReqs.clear();
             tabletRawReqSizes.clear();
 
-            if (!handleTSStatus(result, loadTsFileStatement)) {
-              return Optional.empty();
-            }
-
             tabletRawReqs.add(tabletRawReq);
             tabletRawReqSizes.add(curMemory);
             block.addMemoryUsage(curMemory);
@@ -133,25 +166,39 @@ public class 
LoadTreeStatementDataTypeConvertExecutionVisitor
 
       if (!tabletRawReqs.isEmpty()) {
         try {
-          final TSStatus result =
-              executeInsertMultiTabletsWithRetry(
-                  tabletRawReqs, 
loadTsFileStatement.isConvertOnTypeMismatch());
+          final InsertMultiTabletsStatement batchStatement = new 
InsertMultiTabletsStatement();
+          batchStatement.setInsertTabletStatementList(
+              tabletRawReqs.stream()
+                  .map(
+                      req ->
+                          new LoadConvertedInsertTabletStatement(
+                              req.constructStatement(),
+                              loadTsFileStatement.isConvertOnTypeMismatch()))
+                  .collect(Collectors.toList()));
+          
executionFutures.add(executeInsertMultiTabletsWithRetry(batchStatement));
 
           for (final long memoryCost : tabletRawReqSizes) {
             block.reduceMemoryUsage(memoryCost);
           }
           tabletRawReqs.clear();
           tabletRawReqSizes.clear();
-
-          if (!handleTSStatus(result, loadTsFileStatement)) {
-            return Optional.empty();
-          }
         } catch (final Exception e) {
           LOGGER.warn(
               "Failed to convert data type for LoadTsFileStatement: {}.", 
loadTsFileStatement, e);
           return Optional.empty();
         }
       }
+
+      for (final Future<TSStatus> future : executionFutures) {
+        try {
+          if (!handleTSStatus(future.get(), loadTsFileStatement)) {
+            return Optional.empty();
+          }
+        } catch (ExecutionException | InterruptedException e) {
+          LOGGER.warn("Exception occurs when executing insertion during tablet 
conversion: ", e);
+          return Optional.empty();
+        }
+      }
     } finally {
       for (final long memoryCost : tabletRawReqSizes) {
         block.reduceMemoryUsage(memoryCost);
@@ -179,43 +226,67 @@ public class 
LoadTreeStatementDataTypeConvertExecutionVisitor
     return Optional.of(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
-  private TSStatus executeInsertMultiTabletsWithRetry(
-      final List<PipeTransferTabletRawReq> tabletRawReqs, boolean 
isConvertOnTypeMismatch) {
-    final InsertMultiTabletsStatement batchStatement = new 
InsertMultiTabletsStatement();
-    batchStatement.setInsertTabletStatementList(
-        tabletRawReqs.stream()
-            .map(
-                req ->
-                    new LoadConvertedInsertTabletStatement(
-                        req.constructStatement(), isConvertOnTypeMismatch))
-            .collect(Collectors.toList()));
-
-    TSStatus result;
-    try {
-      result =
-          batchStatement.accept(
-              LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
-              statementExecutor.execute(batchStatement));
-
-      // Retry max 5 times if the write process is rejected
-      for (int i = 0;
-          i < 5
-              && result.getCode()
-                  == 
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
-          i++) {
-        Thread.sleep(100L * (i + 1));
-        result =
-            batchStatement.accept(
-                LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
-                statementExecutor.execute(batchStatement));
-      }
-    } catch (final Exception e) {
-      if (e instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
+  private Future<TSStatus> executeInsertMultiTabletsWithRetry(
+      final InsertMultiTabletsStatement batchStatement) {
+    return getExecutorPool()
+        .submit(
+            () -> {
+              TSStatus result;
+              try {
+                result =
+                    batchStatement.accept(
+                        LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+                        statementExecutor.execute(batchStatement));
+
+                // Retry max 5 times if the write process is rejected
+                for (int i = 0;
+                    i < 5
+                        && result.getCode()
+                            == 
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
+                    i++) {
+                  Thread.sleep(100L * (i + 1));
+                  result =
+                      batchStatement.accept(
+                          LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+                          statementExecutor.execute(batchStatement));
+                }
+              } catch (final Exception e) {
+                if (e instanceof InterruptedException) {
+                  Thread.currentThread().interrupt();
+                }
+                result =
+                    batchStatement.accept(
+                        
LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e);
+              }
+              return result;
+            });
+  }
+
+  public static WrappedThreadPoolExecutor getExecutorPool() {
+    if (executorPool.get() == null) {
+      synchronized (executorPool) {
+        if (executorPool.get() == null) {
+          executorPool.set(
+              new WrappedThreadPoolExecutor(
+                  IoTDBDescriptor.getInstance()
+                      .getConfig()
+                      .getLoadTsFileTabletConversionThreadCount(),
+                  IoTDBDescriptor.getInstance()
+                      .getConfig()
+                      .getLoadTsFileTabletConversionThreadCount(),
+                  0L,
+                  TimeUnit.SECONDS,
+                  new ArrayBlockingQueue<>(
+                      IoTDBDescriptor.getInstance()
+                          .getConfig()
+                          .getLoadTsFileTabletConversionThreadCount()),
+                  new 
IoTThreadFactory(ThreadName.LOAD_DATATYPE_CONVERT_POOL.getName()),
+                  ThreadName.LOAD_DATATYPE_CONVERT_POOL.getName(),
+                  new CallerBlocksPolicy()));
+        }
       }
-      result = 
batchStatement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, 
e);
     }
-    return result;
+    return executorPool.get();
   }
 
   private static boolean handleTSStatus(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
index ee2a8fe2547..a46546e3e36 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -20,7 +20,11 @@
 package org.apache.iotdb.db.storageengine.load.converter;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
@@ -33,6 +37,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.ZoneId;
 import java.util.Optional;
 
 public class LoadTsFileDataTypeConverter {
@@ -68,17 +73,32 @@ public class LoadTsFileDataTypeConverter {
   }
 
   private TSStatus executeForTreeModel(final Statement statement) {
-    return Coordinator.getInstance()
-        .executeForTreeModel(
-            isGeneratedByPipe ? new PipeEnrichedStatement(statement) : 
statement,
-            SESSION_MANAGER.requestQueryId(),
-            SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
-            "",
-            ClusterPartitionFetcher.getInstance(),
-            ClusterSchemaFetcher.getInstance(),
-            
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
-            false)
-        .status;
+    final IClientSession session =
+        new InternalClientSession(
+            String.format(
+                "%s_%s",
+                LoadTsFileDataTypeConverter.class.getSimpleName(),
+                Thread.currentThread().getName()));
+    session.setUsername(AuthorityChecker.SUPER_USER);
+    session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
+    session.setZoneId(ZoneId.systemDefault());
+
+    SESSION_MANAGER.registerSession(session);
+    try {
+      return Coordinator.getInstance()
+          .executeForTreeModel(
+              isGeneratedByPipe ? new PipeEnrichedStatement(statement) : 
statement,
+              SESSION_MANAGER.requestQueryId(),
+              SESSION_MANAGER.getSessionInfo(session),
+              "",
+              ClusterPartitionFetcher.getInstance(),
+              ClusterSchemaFetcher.getInstance(),
+              
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
+              false)
+          .status;
+    } finally {
+      SESSION_MANAGER.removeCurrSession();
+    }
   }
 
   public boolean isSuccessful(final TSStatus status) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 3172c65ae5e..af4f84a0d0a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -141,6 +141,7 @@ public enum ThreadName {
   PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
   PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
   PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"),
+  LOAD_DATATYPE_CONVERT_POOL("Load-Datatype-Convert-Pool"),
   SUBSCRIPTION_EXECUTOR_POOL("Subscription-Executor-Pool"),
   SUBSCRIPTION_RUNTIME_META_SYNCER("Subscription-Runtime-Meta-Syncer"),
   WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),

Reply via email to