This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 44a6144c7ae [To dev/1.3] Load: Introduce thread pool for tablet
conversion and insertion (#15661)
44a6144c7ae is described below
commit 44a6144c7ae47b21a40059562771bfc2381e7c00
Author: Zikun Ma <[email protected]>
AuthorDate: Fri Jun 6 16:58:27 2025 +0800
[To dev/1.3] Load: Introduce thread pool for tablet conversion and
insertion (#15661)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +
.../pipe/resource/memory/PipeMemoryWeightUtil.java | 12 +-
...eeStatementDataTypeConvertExecutionVisitor.java | 169 +++++++++++++++------
.../converter/LoadTsFileDataTypeConverter.java | 42 +++--
.../iotdb/commons/concurrent/ThreadName.java | 1 +
6 files changed, 173 insertions(+), 66 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 7f31bbefd48..38a4cb648aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1145,6 +1145,8 @@ public class IoTDBConfig {
private long loadTsFileTabletConversionBatchMemorySizeInBytes = 4096 * 1024;
+ private int loadTsFileTabletConversionThreadCount = 5;
+
private long loadChunkMetadataMemorySizeInBytes = 33554432; // 32MB
private long loadMemoryAllocateRetryIntervalMs = 1000L;
@@ -4014,6 +4016,14 @@ public class IoTDBConfig {
loadTsFileTabletConversionBatchMemorySizeInBytes;
}
+ public int getLoadTsFileTabletConversionThreadCount() {
+ return loadTsFileTabletConversionThreadCount;
+ }
+
+ public void setLoadTsFileTabletConversionThreadCount(int
loadTsFileTabletConversionThreadCount) {
+ this.loadTsFileTabletConversionThreadCount =
loadTsFileTabletConversionThreadCount;
+ }
+
public long getLoadChunkMetadataMemorySizeInBytes() {
return loadChunkMetadataMemorySizeInBytes;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 94576d0aabf..50fccad85b8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2381,6 +2381,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"load_tsfile_tablet_conversion_batch_memory_size_in_bytes",
String.valueOf(conf.getLoadTsFileTabletConversionBatchMemorySizeInBytes()))));
+ conf.setLoadTsFileTabletConversionThreadCount(
+ Integer.parseInt(
+ properties.getProperty(
+ "load_tsfile_tablet_conversion_thread_count",
+
String.valueOf(conf.getLoadTsFileTabletConversionThreadCount()))));
conf.setLoadChunkMetadataMemorySizeInBytes(
Long.parseLong(
Optional.ofNullable(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index d82c26144d6..cd9a0a79cae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -247,8 +247,8 @@ public class PipeMemoryWeightUtil {
return totalSizeInBytes;
}
- public static int calculateBatchDataRamBytesUsed(BatchData batchData) {
- int totalSizeInBytes = 0;
+ public static long calculateBatchDataRamBytesUsed(BatchData batchData) {
+ long totalSizeInBytes = 0;
// timestamp
totalSizeInBytes += 8;
@@ -263,16 +263,16 @@ public class PipeMemoryWeightUtil {
continue;
}
// consider variable references (plus 8) and memory alignment (round
up to 8)
- totalSizeInBytes += roundUpToMultiple(primitiveType.getSize() + 8,
8);
+ totalSizeInBytes += roundUpToMultiple(primitiveType.getSize() + 8L,
8);
}
} else {
if (type.isBinary()) {
final Binary binary = batchData.getBinary();
// refer to org.apache.tsfile.utils.TsPrimitiveType.TsBinary.getSize
totalSizeInBytes +=
- roundUpToMultiple((binary == null ? 8 : binary.getLength() + 8)
+ 8, 8);
+ roundUpToMultiple((binary == null ? 8 : binary.ramBytesUsed()) +
8L, 8);
} else {
- totalSizeInBytes +=
roundUpToMultiple(TsPrimitiveType.getByType(type).getSize() + 8, 8);
+ totalSizeInBytes +=
roundUpToMultiple(TsPrimitiveType.getByType(type).getSize() + 8L, 8);
}
}
}
@@ -287,7 +287,7 @@ public class PipeMemoryWeightUtil {
* @param n The specified multiple.
* @return The nearest multiple of n greater than or equal to num.
*/
- private static int roundUpToMultiple(int num, int n) {
+ private static long roundUpToMultiple(long num, int n) {
if (n == 0) {
throw new IllegalArgumentException("The multiple n must be greater than
0");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
index cb147f9c286..7dfe91ad62e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -20,6 +20,9 @@
package org.apache.iotdb.db.storageengine.load.converter;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
@@ -45,6 +48,14 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static
org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil.calculateTabletSizeInBytes;
@@ -59,6 +70,9 @@ public class LoadTreeStatementDataTypeConvertExecutionVisitor
.getConfig()
.getLoadTsFileTabletConversionBatchMemorySizeInBytes();
+ private static final AtomicReference<WrappedThreadPoolExecutor> executorPool
=
+ new AtomicReference<>();
+
private final StatementExecutor statementExecutor;
@FunctionalInterface
@@ -66,6 +80,21 @@ public class LoadTreeStatementDataTypeConvertExecutionVisitor
TSStatus execute(final Statement statement);
}
+ public static class CallerBlocksPolicy implements RejectedExecutionHandler {
+ public CallerBlocksPolicy() {}
+
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+ if (!e.isShutdown()) {
+ try {
+ e.getQueue().put(r);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new RejectedExecutionException("task " + r + " rejected from "
+ e, ie);
+ }
+ }
+ }
+ }
+
public LoadTreeStatementDataTypeConvertExecutionVisitor(
final StatementExecutor statementExecutor) {
this.statementExecutor = statementExecutor;
@@ -89,6 +118,7 @@ public class LoadTreeStatementDataTypeConvertExecutionVisitor
final List<Long> tabletRawReqSizes = new ArrayList<>();
try {
+ final List<Future<TSStatus>> executionFutures = new ArrayList<>();
for (final File file : loadTsFileStatement.getTsFiles()) {
try (final TsFileInsertionScanDataContainer container =
new TsFileInsertionScanDataContainer(
@@ -106,9 +136,16 @@ public class
LoadTreeStatementDataTypeConvertExecutionVisitor
continue;
}
- final TSStatus result =
- executeInsertMultiTabletsWithRetry(
- tabletRawReqs,
loadTsFileStatement.isConvertOnTypeMismatch());
+ final InsertMultiTabletsStatement batchStatement = new
InsertMultiTabletsStatement();
+ batchStatement.setInsertTabletStatementList(
+ tabletRawReqs.stream()
+ .map(
+ req ->
+ new LoadConvertedInsertTabletStatement(
+ req.constructStatement(),
+ loadTsFileStatement.isConvertOnTypeMismatch()))
+ .collect(Collectors.toList()));
+
executionFutures.add(executeInsertMultiTabletsWithRetry(batchStatement));
for (final long memoryCost : tabletRawReqSizes) {
block.reduceMemoryUsage(memoryCost);
@@ -116,10 +153,6 @@ public class
LoadTreeStatementDataTypeConvertExecutionVisitor
tabletRawReqs.clear();
tabletRawReqSizes.clear();
- if (!handleTSStatus(result, loadTsFileStatement)) {
- return Optional.empty();
- }
-
tabletRawReqs.add(tabletRawReq);
tabletRawReqSizes.add(curMemory);
block.addMemoryUsage(curMemory);
@@ -133,25 +166,39 @@ public class
LoadTreeStatementDataTypeConvertExecutionVisitor
if (!tabletRawReqs.isEmpty()) {
try {
- final TSStatus result =
- executeInsertMultiTabletsWithRetry(
- tabletRawReqs,
loadTsFileStatement.isConvertOnTypeMismatch());
+ final InsertMultiTabletsStatement batchStatement = new
InsertMultiTabletsStatement();
+ batchStatement.setInsertTabletStatementList(
+ tabletRawReqs.stream()
+ .map(
+ req ->
+ new LoadConvertedInsertTabletStatement(
+ req.constructStatement(),
+ loadTsFileStatement.isConvertOnTypeMismatch()))
+ .collect(Collectors.toList()));
+
executionFutures.add(executeInsertMultiTabletsWithRetry(batchStatement));
for (final long memoryCost : tabletRawReqSizes) {
block.reduceMemoryUsage(memoryCost);
}
tabletRawReqs.clear();
tabletRawReqSizes.clear();
-
- if (!handleTSStatus(result, loadTsFileStatement)) {
- return Optional.empty();
- }
} catch (final Exception e) {
LOGGER.warn(
"Failed to convert data type for LoadTsFileStatement: {}.",
loadTsFileStatement, e);
return Optional.empty();
}
}
+
+ for (final Future<TSStatus> future : executionFutures) {
+ try {
+ if (!handleTSStatus(future.get(), loadTsFileStatement)) {
+ return Optional.empty();
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ LOGGER.warn("Exception occurs when executing insertion during tablet
conversion: ", e);
+ return Optional.empty();
+ }
+ }
} finally {
for (final long memoryCost : tabletRawReqSizes) {
block.reduceMemoryUsage(memoryCost);
@@ -179,43 +226,67 @@ public class
LoadTreeStatementDataTypeConvertExecutionVisitor
return Optional.of(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
- private TSStatus executeInsertMultiTabletsWithRetry(
- final List<PipeTransferTabletRawReq> tabletRawReqs, boolean
isConvertOnTypeMismatch) {
- final InsertMultiTabletsStatement batchStatement = new
InsertMultiTabletsStatement();
- batchStatement.setInsertTabletStatementList(
- tabletRawReqs.stream()
- .map(
- req ->
- new LoadConvertedInsertTabletStatement(
- req.constructStatement(), isConvertOnTypeMismatch))
- .collect(Collectors.toList()));
-
- TSStatus result;
- try {
- result =
- batchStatement.accept(
- LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
- statementExecutor.execute(batchStatement));
-
- // Retry max 5 times if the write process is rejected
- for (int i = 0;
- i < 5
- && result.getCode()
- ==
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
- i++) {
- Thread.sleep(100L * (i + 1));
- result =
- batchStatement.accept(
- LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
- statementExecutor.execute(batchStatement));
- }
- } catch (final Exception e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
+ private Future<TSStatus> executeInsertMultiTabletsWithRetry(
+ final InsertMultiTabletsStatement batchStatement) {
+ return getExecutorPool()
+ .submit(
+ () -> {
+ TSStatus result;
+ try {
+ result =
+ batchStatement.accept(
+ LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+ statementExecutor.execute(batchStatement));
+
+ // Retry max 5 times if the write process is rejected
+ for (int i = 0;
+ i < 5
+ && result.getCode()
+ ==
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
+ i++) {
+ Thread.sleep(100L * (i + 1));
+ result =
+ batchStatement.accept(
+ LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+ statementExecutor.execute(batchStatement));
+ }
+ } catch (final Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ result =
+ batchStatement.accept(
+
LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e);
+ }
+ return result;
+ });
+ }
+
+ public static WrappedThreadPoolExecutor getExecutorPool() {
+ if (executorPool.get() == null) {
+ synchronized (executorPool) {
+ if (executorPool.get() == null) {
+ executorPool.set(
+ new WrappedThreadPoolExecutor(
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getLoadTsFileTabletConversionThreadCount(),
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getLoadTsFileTabletConversionThreadCount(),
+ 0L,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getLoadTsFileTabletConversionThreadCount()),
+ new
IoTThreadFactory(ThreadName.LOAD_DATATYPE_CONVERT_POOL.getName()),
+ ThreadName.LOAD_DATATYPE_CONVERT_POOL.getName(),
+ new CallerBlocksPolicy()));
+ }
}
- result =
batchStatement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR,
e);
}
- return result;
+ return executorPool.get();
}
private static boolean handleTSStatus(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
index ee2a8fe2547..a46546e3e36 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -20,7 +20,11 @@
package org.apache.iotdb.db.storageengine.load.converter;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
@@ -33,6 +37,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.ZoneId;
import java.util.Optional;
public class LoadTsFileDataTypeConverter {
@@ -68,17 +73,32 @@ public class LoadTsFileDataTypeConverter {
}
private TSStatus executeForTreeModel(final Statement statement) {
- return Coordinator.getInstance()
- .executeForTreeModel(
- isGeneratedByPipe ? new PipeEnrichedStatement(statement) :
statement,
- SESSION_MANAGER.requestQueryId(),
- SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
- "",
- ClusterPartitionFetcher.getInstance(),
- ClusterSchemaFetcher.getInstance(),
-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
- false)
- .status;
+ final IClientSession session =
+ new InternalClientSession(
+ String.format(
+ "%s_%s",
+ LoadTsFileDataTypeConverter.class.getSimpleName(),
+ Thread.currentThread().getName()));
+ session.setUsername(AuthorityChecker.SUPER_USER);
+ session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
+ session.setZoneId(ZoneId.systemDefault());
+
+ SESSION_MANAGER.registerSession(session);
+ try {
+ return Coordinator.getInstance()
+ .executeForTreeModel(
+ isGeneratedByPipe ? new PipeEnrichedStatement(statement) :
statement,
+ SESSION_MANAGER.requestQueryId(),
+ SESSION_MANAGER.getSessionInfo(session),
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ ClusterSchemaFetcher.getInstance(),
+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
+ false)
+ .status;
+ } finally {
+ SESSION_MANAGER.removeCurrSession();
+ }
}
public boolean isSuccessful(final TSStatus status) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 3172c65ae5e..af4f84a0d0a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -141,6 +141,7 @@ public enum ThreadName {
PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"),
+ LOAD_DATATYPE_CONVERT_POOL("Load-Datatype-Convert-Pool"),
SUBSCRIPTION_EXECUTOR_POOL("Subscription-Executor-Pool"),
SUBSCRIPTION_RUNTIME_META_SYNCER("Subscription-Runtime-Meta-Syncer"),
WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),