This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 7fdccc916a4 Limit concurrent load tsfile type conversions (#18014)
(#18030)
7fdccc916a4 is described below
commit 7fdccc916a463e1e1c945721a1d3e69f96d9bf88
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 26 14:21:51 2026 +0800
Limit concurrent load tsfile type conversions (#18014) (#18030)
* Limit concurrent load tsfile type conversions
* Keep load conversion retryable on memory pressure
* Use i18n message for active load retry log
* Add active load retry coverage for temporary unavailable
* Fix load conversion memory test compile
(cherry picked from commit 4d05a851ef51cddfaa1270c7e1419bfe6f56ce70)
---
.../it/env/cluster/config/MppDataNodeConfig.java | 25 +++
.../it/env/remote/config/RemoteDataNodeConfig.java | 17 ++
.../apache/iotdb/itbase/env/DataNodeConfig.java | 8 +
.../iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java | 226 +++++++++++++++++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +
.../active/ActiveLoadFailedMessageHandler.java | 16 ++
.../load/active/ActiveLoadTsFileLoader.java | 2 +-
...ertedInsertTabletStatementExceptionVisitor.java | 9 +-
.../converter/LoadTsFileDataTypeConverter.java | 95 +++++++++
.../active/ActiveLoadFailedMessageHandlerTest.java | 51 +++++
.../load/active/ActiveLoadTsFileLoaderTest.java | 146 +++++++++++++
...dInsertTabletStatementExceptionVisitorTest.java | 54 +++++
13 files changed, 659 insertions(+), 5 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 9ac380503cd..c011cd994a2 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -89,6 +89,31 @@ public class MppDataNodeConfig extends MppBaseConfig
implements DataNodeConfig {
return this;
}
+ @Override
+ public DataNodeConfig setMaxAllocateMemoryRatioForLoad(double
maxAllocateMemoryRatioForLoad) {
+ properties.setProperty(
+ "max_allocate_memory_ratio_for_load",
String.valueOf(maxAllocateMemoryRatioForLoad));
+ return this;
+ }
+
+ @Override
+ public DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+ long loadTsFileTabletConversionBatchMemorySizeInBytes) {
+ properties.setProperty(
+ "load_tsfile_tablet_conversion_batch_memory_size_in_bytes",
+ String.valueOf(loadTsFileTabletConversionBatchMemorySizeInBytes));
+ return this;
+ }
+
+ @Override
+ public DataNodeConfig setLoadActiveListeningCheckIntervalSeconds(
+ long loadActiveListeningCheckIntervalSeconds) {
+ properties.setProperty(
+ "load_active_listening_check_interval_seconds",
+ String.valueOf(loadActiveListeningCheckIntervalSeconds));
+ return this;
+ }
+
@Override
public DataNodeConfig setLoadLastCacheStrategy(String strategyName) {
setProperty("last_cache_operation_on_load", strategyName);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index b2550ae689f..46947add596 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -54,6 +54,23 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
return this;
}
+ @Override
+ public DataNodeConfig setMaxAllocateMemoryRatioForLoad(double
maxAllocateMemoryRatioForLoad) {
+ return this;
+ }
+
+ @Override
+ public DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+ long loadTsFileTabletConversionBatchMemorySizeInBytes) {
+ return this;
+ }
+
+ @Override
+ public DataNodeConfig setLoadActiveListeningCheckIntervalSeconds(
+ long loadActiveListeningCheckIntervalSeconds) {
+ return this;
+ }
+
@Override
public DataNodeConfig setLoadLastCacheStrategy(String strategyName) {
return this;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index c27eb369c0d..9e4c62022bd 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -36,6 +36,14 @@ public interface DataNodeConfig {
DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
long loadTsFileAnalyzeSchemaMemorySizeInBytes);
+ DataNodeConfig setMaxAllocateMemoryRatioForLoad(double
maxAllocateMemoryRatioForLoad);
+
+ DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+ long loadTsFileTabletConversionBatchMemorySizeInBytes);
+
+ DataNodeConfig setLoadActiveListeningCheckIntervalSeconds(
+ long loadActiveListeningCheckIntervalSeconds);
+
DataNodeConfig setLoadLastCacheStrategy(String strategyName);
DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java
new file mode 100644
index 00000000000..3827615fe24
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.it.utils.TsFileGenerator;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBLoadTsFileActiveRetryIT {
+
+ private static final String DATABASE = "root.sg.test_0";
+ private static final String DEVICE = DATABASE + ".d_0";
+ private static final String MEASUREMENT = "sensor_00";
+ private static final long
UNALLOCATABLE_TABLET_CONVERSION_BATCH_MEMORY_SIZE_IN_BYTES =
+ Long.MAX_VALUE / 4;
+ private static final MeasurementSchema TSFILE_SCHEMA =
+ new MeasurementSchema(MEASUREMENT, TSDataType.INT32, TSEncoding.RLE);
+
+ private File tmpDir;
+
+ @Before
+ public void setUp() throws Exception {
+ tmpDir = new File(Files.createTempDirectory("load-active-retry").toUri());
+
EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false);
+ EnvFactory.getEnv()
+ .getConfig()
+ .getDataNodeConfig()
+ .setMaxAllocateMemoryRatioForLoad(1.0)
+ .setLoadTsFileAnalyzeSchemaMemorySizeInBytes(1)
+ .setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+ UNALLOCATABLE_TABLET_CONVERSION_BATCH_MEMORY_SIZE_IN_BYTES)
+ .setLoadActiveListeningCheckIntervalSeconds(1);
+
+ EnvFactory.getEnv().initClusterEnvironment();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute("delete database " + DATABASE);
+ } catch (final Exception ignored) {
+ // ignore cleanup failure
+ } finally {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ deleteRecursively(tmpDir);
+ }
+ }
+
+ @Test
+ public void testActiveLoadTemporaryUnavailableShouldKeepFileForRetry()
throws Exception {
+ final DataNodeWrapper dataNodeWrapper =
EnvFactory.getEnv().getDataNodeWrapper(0);
+ final File retryTsFile = new File(tmpDir, "1-0-0-0.tsfile");
+ final File permanentFailureTsFile = new File(tmpDir, "2-0-0-0.tsfile");
+ generateTsFile(retryTsFile);
+ generateTsFile(permanentFailureTsFile);
+
+ try (final Connection connection =
+
EnvFactory.getEnv().getConnectionWithSpecifiedDataNode(dataNodeWrapper);
+ final Statement statement = connection.createStatement()) {
+ statement.execute("create database " + DATABASE);
+ statement.execute(
+ String.format(
+ "create timeseries %s.%s %s", DEVICE, MEASUREMENT,
TSDataType.INT64.name()));
+
+ statement.execute(
+ String.format(
+ "load \"%s\" with ('database-level'='3', 'async'='true',
'on-success'='none', "
+ + "'convert-on-type-mismatch'='true')",
+ retryTsFile.getAbsolutePath()));
+ statement.execute(
+ String.format(
+ "load \"%s\" with ('database-level'='3', 'async'='true',
'on-success'='none', "
+ + "'convert-on-type-mismatch'='false')",
+ permanentFailureTsFile.getAbsolutePath()));
+
+ final File activeDir = getActiveLoadDir(dataNodeWrapper);
+ final File failDir = getActiveLoadFailDir(dataNodeWrapper);
+ final File activeTsFile = waitForFile(activeDir, retryTsFile.getName(),
30_000L);
+
+ Assert.assertNotNull(
+ "Async load should copy tsfile into active load directory",
activeTsFile);
+
+ Assert.assertNotNull(
+ "Permanent active load failure should be moved to fail dir",
+ waitForFile(failDir, permanentFailureTsFile.getName(),
TimeUnit.SECONDS.toMillis(60)));
+
+ assertFileKeptForRetry(
+ activeDir, failDir, retryTsFile.getName(),
TimeUnit.SECONDS.toMillis(12));
+ }
+ }
+
+ private void generateTsFile(final File tsFile) throws Exception {
+ try (final TsFileGenerator generator = new TsFileGenerator(tsFile)) {
+ generator.registerTimeseries(DEVICE,
Collections.singletonList(TSFILE_SCHEMA));
+ generator.generateData(DEVICE, 10, 1, false);
+ }
+ }
+
+ private File getActiveLoadDir(final DataNodeWrapper dataNodeWrapper) {
+ return new File(
+ dataNodeWrapper.getNodePath()
+ + File.separator
+ + "ext"
+ + File.separator
+ + "load"
+ + File.separator
+ + "pending");
+ }
+
+ private File getActiveLoadFailDir(final DataNodeWrapper dataNodeWrapper) {
+ return new File(
+ dataNodeWrapper.getNodePath()
+ + File.separator
+ + "ext"
+ + File.separator
+ + "load"
+ + File.separator
+ + "failed");
+ }
+
+ private File waitForFile(final File root, final String fileName, final long
timeoutMs)
+ throws InterruptedException {
+ final long deadline = System.currentTimeMillis() + timeoutMs;
+ while (System.currentTimeMillis() < deadline) {
+ final File file = findFile(root, fileName);
+ if (file != null) {
+ return file;
+ }
+ Thread.sleep(500L);
+ }
+ return null;
+ }
+
+ private boolean containsFile(final File root, final String fileName) {
+ return findFile(root, fileName) != null;
+ }
+
+ private void assertFileKeptForRetry(
+ final File activeDir, final File failDir, final String fileName, final
long observationMs)
+ throws InterruptedException {
+ final long deadline = System.currentTimeMillis() + observationMs;
+ while (System.currentTimeMillis() < deadline) {
+ Assert.assertTrue(
+ "Temporary unavailable active load should keep tsfile for retry",
+ containsFile(activeDir, fileName));
+ Assert.assertFalse(
+ "Temporary unavailable active load must not move tsfile to fail dir",
+ containsFile(failDir, fileName));
+ Thread.sleep(500L);
+ }
+ }
+
+ private File findFile(final File root, final String fileName) {
+ if (root == null || !root.exists()) {
+ return null;
+ }
+ if (root.isFile()) {
+ return root.getName().equals(fileName) ? root : null;
+ }
+
+ final File[] children = root.listFiles();
+ if (children == null) {
+ return null;
+ }
+ for (final File child : children) {
+ final File file = findFile(child, fileName);
+ if (file != null) {
+ return file;
+ }
+ }
+ return null;
+ }
+
+ private void deleteRecursively(final File file) {
+ if (file == null || !file.exists()) {
+ return;
+ }
+ final File[] children = file.listFiles();
+ if (children != null) {
+ for (final File child : children) {
+ deleteRecursively(child);
+ }
+ }
+ Assert.assertTrue(file.delete());
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c92c9bba28a..a3130f49f34 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1158,6 +1158,8 @@ public class IoTDBConfig {
private long loadTsFileTabletConversionBatchMemorySizeInBytes = 4096 * 1024;
+ private int loadTsFileTabletConversionThreadCount = 5;
+
private long loadChunkMetadataMemorySizeInBytes = 33554432; // 32MB
private long loadMemoryAllocateRetryIntervalMs = 1000L;
@@ -4141,6 +4143,14 @@ public class IoTDBConfig {
loadTsFileTabletConversionBatchMemorySizeInBytes;
}
+ public int getLoadTsFileTabletConversionThreadCount() {
+ return loadTsFileTabletConversionThreadCount;
+ }
+
+ public void setLoadTsFileTabletConversionThreadCount(int
loadTsFileTabletConversionThreadCount) {
+ this.loadTsFileTabletConversionThreadCount =
loadTsFileTabletConversionThreadCount;
+ }
+
public long getLoadChunkMetadataMemorySizeInBytes() {
return loadChunkMetadataMemorySizeInBytes;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f9e87375b78..a838203a943 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2426,6 +2426,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"load_tsfile_tablet_conversion_batch_memory_size_in_bytes",
String.valueOf(conf.getLoadTsFileTabletConversionBatchMemorySizeInBytes()))));
+ conf.setLoadTsFileTabletConversionThreadCount(
+ Integer.parseInt(
+ properties.getProperty(
+ "load_tsfile_tablet_conversion_thread_count",
+
String.valueOf(conf.getLoadTsFileTabletConversionThreadCount()))));
conf.setLoadChunkMetadataMemorySizeInBytes(
Long.parseLong(
Optional.ofNullable(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
index c2d9af94725..89039889d08 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.storageengine.load.active;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,6 +98,20 @@ public class ActiveLoadFailedMessageHandler {
void handle(final ActiveLoadPendingQueue.ActiveLoadEntry activeLoadEntry);
}
+ public static boolean isStatusShouldRetry(
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry, final TSStatus
status) {
+ if (status != null
+ && status.getCode() ==
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
+ LOGGER.info(
+ "Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to
temporary unavailability, will retry later. Status: {}",
+ entry.getFile(),
+ entry.isGeneratedByPipe(),
+ status);
+ return true;
+ }
+ return isExceptionMessageShouldRetry(entry, status == null ? null :
status.getMessage());
+ }
+
public static boolean isExceptionMessageShouldRetry(
final ActiveLoadPendingQueue.ActiveLoadEntry entry, final String
message) {
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index ffb9e23bbfa..a9133efe4f3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -280,7 +280,7 @@ public class ActiveLoadTsFileLoader {
private void handleLoadFailure(
final ActiveLoadPendingQueue.ActiveLoadEntry entry, final TSStatus
status) {
- if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(entry,
status.getMessage())) {
+ if (!ActiveLoadFailedMessageHandler.isStatusShouldRetry(entry, status)) {
LOGGER.warn(
"Failed to auto load tsfile {} (isGeneratedByPipe = {}), status: {}.
File will be moved to fail directory.",
entry.getFile(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
index f292ee55930..59ad6ebcd6c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.storageengine.load.converter;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
@@ -32,6 +31,9 @@ public class
LoadConvertedInsertTabletStatementExceptionVisitor
@Override
public TSStatus visitNode(final StatementNode node, final Exception context)
{
+ if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) {
+ return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context);
+ }
return new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode())
.setMessage(context.getMessage());
}
@@ -39,9 +41,8 @@ public class
LoadConvertedInsertTabletStatementExceptionVisitor
@Override
public TSStatus visitLoadFile(
final LoadTsFileStatement loadTsFileStatement, final Exception context) {
- if (context instanceof LoadRuntimeOutOfMemoryException) {
- return new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
+ if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) {
+ return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context);
} else if (context instanceof SemanticException) {
return new
TSStatus(TSStatusCode.LOAD_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
index e46e25c5f7f..be154ac8181 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -21,8 +21,12 @@ package org.apache.iotdb.db.storageengine.load.converter;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.InternalClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
@@ -42,6 +46,7 @@ import org.slf4j.LoggerFactory;
import java.time.ZoneId;
import java.util.Optional;
+import java.util.concurrent.Semaphore;
public class LoadTsFileDataTypeConverter {
@@ -49,6 +54,85 @@ public class LoadTsFileDataTypeConverter {
private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
+ private static Semaphore getTabletConversionSemaphore() {
+ return TabletConversionSemaphoreHolder.INSTANCE;
+ }
+
+ private static int getTabletConversionPermitCount() {
+ final int configuredThreadCount =
+ Math.max(
+ 1,
+
IoTDBDescriptor.getInstance().getConfig().getLoadTsFileTabletConversionThreadCount());
+ if (!PipeConfig.getInstance().getPipeMemoryManagementEnabled()) {
+ return configuredThreadCount;
+ }
+ final long memorySafePermitCount =
+ getAllowedPipeTabletMemorySizeInBytes() /
estimatePipeTabletMemorySizePerConversion();
+ return (int) Math.max(1, Math.min((long) configuredThreadCount,
memorySafePermitCount));
+ }
+
+ private static long estimatePipeTabletMemorySizePerConversion() {
+ final PipeConfig pipeConfig = PipeConfig.getInstance();
+ final long tabletSize =
+ Math.max(
+ 1L,
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
+ final long maxReaderChunkSize = Math.max(0L,
pipeConfig.getPipeMaxReaderChunkSize());
+ final long tableSize =
+ Math.max(
+ 1L,
+ Math.min(tabletSize,
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize()));
+
+ final long treeParserMemorySize = 2L * tabletSize + maxReaderChunkSize;
+ final long tableParserMemorySize = 2L * tabletSize + 2L * tableSize +
maxReaderChunkSize;
+ return Math.max(1L, Math.max(treeParserMemorySize, tableParserMemorySize));
+ }
+
+ private static long getAllowedPipeTabletMemorySizeInBytes() {
+ final PipeConfig pipeConfig = PipeConfig.getInstance();
+ return (long)
+
((pipeConfig.getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold()
+ +
pipeConfig.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold() / 2)
+ *
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
+ }
+
+ private static Optional<TSStatus> getInterruptedConversionStatus(final
InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Optional.of(
+ new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage("Interrupted while waiting for tablet conversion slot:
" + e.getMessage()));
+ }
+
+ public static boolean isMemoryPressureException(final Throwable throwable) {
+ Throwable current = throwable;
+ while (current != null) {
+ if (current instanceof LoadRuntimeOutOfMemoryException
+ || current instanceof PipeRuntimeOutOfMemoryCriticalException) {
+ return true;
+ }
+ current = current.getCause();
+ }
+ return false;
+ }
+
+ public static TSStatus getMemoryPressureStatus(final Throwable throwable) {
+ Throwable current = throwable;
+ while (current != null) {
+ if (current instanceof LoadRuntimeOutOfMemoryException
+ || current instanceof PipeRuntimeOutOfMemoryCriticalException) {
+ return new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage(current.getMessage());
+ }
+ current = current.getCause();
+ }
+
+ return new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage(throwable == null ? null : throwable.getMessage());
+ }
+
+ private static class TabletConversionSemaphoreHolder {
+ private static final Semaphore INSTANCE = new
Semaphore(getTabletConversionPermitCount());
+ }
+
public static final LoadConvertedInsertTabletStatementTSStatusVisitor
STATEMENT_STATUS_VISITOR =
new LoadConvertedInsertTabletStatementTSStatusVisitor();
public static final LoadConvertedInsertTabletStatementExceptionVisitor
@@ -69,14 +153,25 @@ public class LoadTsFileDataTypeConverter {
public Optional<TSStatus> convertForTreeModel(final LoadTsFileStatement
loadTsFileTreeStatement) {
DataNodeSchemaLockManager.getInstance().releaseReadLock(context);
+ boolean isPermitAcquired = false;
try {
+ getTabletConversionSemaphore().acquire();
+ isPermitAcquired = true;
return
loadTsFileTreeStatement.accept(treeStatementDataTypeConvertExecutionVisitor,
null);
+ } catch (final InterruptedException e) {
+ return getInterruptedConversionStatus(e);
} catch (Exception e) {
+ if (isMemoryPressureException(e)) {
+ return Optional.of(getMemoryPressureStatus(e));
+ }
LOGGER.warn(
"Failed to convert data types for load statement {}.",
loadTsFileTreeStatement, e);
return Optional.of(
new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
} finally {
+ if (isPermitAcquired) {
+ getTabletConversionSemaphore().release();
+ }
DataNodeSchemaLockManager.getInstance()
.takeReadLock(context, SchemaLockType.VALIDATE_VS_DELETION);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java
new file mode 100644
index 00000000000..caf8a9f8d2b
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ActiveLoadFailedMessageHandlerTest {
+
+ @Test
+ public void testTemporaryUnavailableStatusShouldRetryWithoutMessageMatch() {
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+ new ActiveLoadPendingQueue.ActiveLoadEntry("test.tsfile", "pending",
true);
+
+ Assert.assertTrue(
+ ActiveLoadFailedMessageHandler.isStatusShouldRetry(
+ entry,
+ new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage("temporarily unavailable")));
+ }
+
+ @Test
+ public void testPermanentStatusShouldNotRetryWithoutMessageMatch() {
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+ new ActiveLoadPendingQueue.ActiveLoadEntry("test.tsfile", "pending",
true);
+
+ Assert.assertFalse(
+ ActiveLoadFailedMessageHandler.isStatusShouldRetry(
+ entry, new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage("bad")));
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java
new file mode 100644
index 00000000000..f21748c3d3e
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+
+public class ActiveLoadTsFileLoaderTest {
+
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private File tempDir;
+ private String originalFailDir;
+ private NodeStatus originalNodeStatus;
+
+ @Before
+ public void setUp() throws Exception {
+ tempDir = Files.createTempDirectory("active-load-retry").toFile();
+ originalFailDir = config.getLoadActiveListeningFailDir();
+ originalNodeStatus =
CommonDescriptor.getInstance().getConfig().getNodeStatus();
+
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.Running);
+ config.setLoadActiveListeningFailDir(new File(tempDir,
"failed").getAbsolutePath());
+ }
+
+ @After
+ public void tearDown() {
+ config.setLoadActiveListeningFailDir(originalFailDir);
+
CommonDescriptor.getInstance().getConfig().setNodeStatus(originalNodeStatus);
+ deleteRecursively(tempDir);
+ }
+
+ @Test
+ public void testTemporaryUnavailableStatusDoesNotMoveFileToFailDir() throws
Exception {
+ final ActiveLoadTsFileLoader loader = new ActiveLoadTsFileLoader();
+ final File tsFile = createTsFileWithCompanionFiles("retry.tsfile");
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+ new ActiveLoadPendingQueue.ActiveLoadEntry(
+ tsFile.getAbsolutePath(), tempDir.getAbsolutePath(), false);
+
+ invokeInitFailDirIfNecessary(loader);
+ invokeHandleLoadFailure(
+ loader,
+ entry,
+ new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage("load conversion is temporarily unavailable"));
+
+ Assert.assertTrue(tsFile.exists());
+ Assert.assertTrue(new File(tsFile.getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(new File(tsFile.getAbsolutePath() +
ModificationFile.FILE_SUFFIX).exists());
+ Assert.assertFalse(new File(config.getLoadActiveListeningFailDir(),
tsFile.getName()).exists());
+ }
+
+ @Test
+ public void testPermanentFailureStatusMovesFileToFailDir() throws Exception {
+ final ActiveLoadTsFileLoader loader = new ActiveLoadTsFileLoader();
+ final File tsFile = createTsFileWithCompanionFiles("failed.tsfile");
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+ new ActiveLoadPendingQueue.ActiveLoadEntry(
+ tsFile.getAbsolutePath(), tempDir.getAbsolutePath(), false);
+
+ invokeInitFailDirIfNecessary(loader);
+ invokeHandleLoadFailure(
+ loader,
+ entry,
+ new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage("permanent
error"));
+
+ final File failDir = new File(config.getLoadActiveListeningFailDir());
+ Assert.assertFalse(tsFile.exists());
+ Assert.assertTrue(new File(failDir, tsFile.getName()).exists());
+ Assert.assertTrue(
+ new File(failDir, tsFile.getName() +
TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(new File(failDir, tsFile.getName() +
ModificationFile.FILE_SUFFIX).exists());
+ }
+
+ private File createTsFileWithCompanionFiles(final String fileName) throws
Exception {
+ final File tsFile = new File(tempDir, fileName);
+ Assert.assertTrue(tsFile.createNewFile());
+ Assert.assertTrue(
+ new File(tsFile.getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX).createNewFile());
+ Assert.assertTrue(
+ new File(tsFile.getAbsolutePath() +
ModificationFile.FILE_SUFFIX).createNewFile());
+ return tsFile;
+ }
+
+ private void invokeInitFailDirIfNecessary(final ActiveLoadTsFileLoader
loader) throws Exception {
+ final Method method =
ActiveLoadTsFileLoader.class.getDeclaredMethod("initFailDirIfNecessary");
+ method.setAccessible(true);
+ method.invoke(loader);
+ }
+
+ private void invokeHandleLoadFailure(
+ final ActiveLoadTsFileLoader loader,
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry,
+ final TSStatus status)
+ throws Exception {
+ final Method method =
+ ActiveLoadTsFileLoader.class.getDeclaredMethod(
+ "handleLoadFailure", ActiveLoadPendingQueue.ActiveLoadEntry.class,
TSStatus.class);
+ method.setAccessible(true);
+ method.invoke(loader, entry, status);
+ }
+
+ private static void deleteRecursively(final File file) {
+ if (file == null || !file.exists()) {
+ return;
+ }
+ final File[] children = file.listFiles();
+ if (children != null) {
+ for (final File child : children) {
+ deleteRecursively(child);
+ }
+ }
+ Assert.assertTrue(file.delete());
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitorTest.java
new file mode 100644
index 00000000000..d97e30407ab
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitorTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+
+public class LoadConvertedInsertTabletStatementExceptionVisitorTest {
+
+ @Test
+ public void testPipeOutOfMemoryIsTemporaryUnavailable() throws Exception {
+ final File tsFile = File.createTempFile("oom", ".tsfile");
+ try {
+ final LoadConvertedInsertTabletStatementExceptionVisitor visitor =
+ new LoadConvertedInsertTabletStatementExceptionVisitor();
+ final TSStatus status =
+ visitor.visitLoadFile(
+ LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
+ new IllegalStateException(
+ "wrapped memory pressure",
+ new PipeRuntimeOutOfMemoryCriticalException("pipe tablet
memory is not enough")));
+
+ Assert.assertEquals(
+ TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(),
status.getCode());
+ Assert.assertNotEquals(TSStatusCode.LOAD_FILE_ERROR.getStatusCode(),
status.getCode());
+ } finally {
+ Assert.assertTrue(tsFile.delete());
+ }
+ }
+}