This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new bc966a68991 [to dev/1.3] Allow hot reloading compaction from disabled
status (#14470) (#14480)
bc966a68991 is described below
commit bc966a68991acdfad911def40de9e4a751fd2bd4
Author: shuwenwei <[email protected]>
AuthorDate: Wed Dec 18 17:43:51 2024 +0800
[to dev/1.3] Allow hot reloading compaction from disabled status (#14470)
(#14480)
* Allow hot reloading compaction from disabled status (#14470)
* allow hot reloading compaction from disabled status
* Modify the value setting of the parameter to conform to the comment.
* fix ut
* Initialize compaction schedule even when compaction is not enabled
---
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 37 ++++++++++++----------
.../impl/DataNodeInternalRPCServiceImpl.java | 7 ++--
.../config/executor/ClusterConfigTaskExecutor.java | 8 ++---
.../db/storageengine/dataregion/DataRegion.java | 5 ---
.../compaction/schedule/CompactionTaskManager.java | 11 ++++---
5 files changed, 32 insertions(+), 36 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 47d0f938fb3..fefc0614b2d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -623,12 +623,20 @@ public class IoTDBDescriptor {
}
conf.setMergeIntervalSec(
Long.parseLong(
- properties.getProperty(
- "merge_interval_sec",
Long.toString(conf.getMergeIntervalSec()))));
- conf.setCompactionThreadCount(
+ Optional.ofNullable(
+ properties.getProperty(
+ "merge_interval_sec",
Long.toString(conf.getMergeIntervalSec())))
+ .map(String::trim)
+ .orElse(Long.toString(conf.getMergeIntervalSec()))));
+ int compactionThreadCount =
Integer.parseInt(
- properties.getProperty(
- "compaction_thread_count",
Integer.toString(conf.getCompactionThreadCount()))));
+ Optional.ofNullable(
+ properties.getProperty(
+ "compaction_thread_count",
+ Integer.toString(conf.getCompactionThreadCount())))
+ .map(String::trim)
+ .orElse(Integer.toString(conf.getCompactionThreadCount())));
+ conf.setCompactionThreadCount(compactionThreadCount <= 0 ? 1 :
compactionThreadCount);
int maxConcurrentAlignedSeriesInCompaction =
Integer.parseInt(
properties.getProperty(
@@ -1209,8 +1217,8 @@ public class IoTDBDescriptor {
CompactionScheduleTaskManager.getInstance().checkAndMayApplyConfigurationChange();
// hot load compaction task manager configurations
- loadCompactionIsEnabledHotModifiedProps(properties);
- boolean restartCompactionTaskManager =
loadCompactionThreadCountHotModifiedProps(properties);
+ boolean restartCompactionTaskManager =
loadCompactionIsEnabledHotModifiedProps(properties);
+ restartCompactionTaskManager |=
loadCompactionThreadCountHotModifiedProps(properties);
restartCompactionTaskManager |=
loadCompactionSubTaskCountHotModifiedProps(properties);
if (restartCompactionTaskManager) {
CompactionTaskManager.getInstance().restart();
@@ -1425,8 +1433,7 @@ public class IoTDBDescriptor {
"compaction_thread_count",
ConfigurationFileUtils.getConfigurationDefaultValue("compaction_thread_count")));
if (newConfigCompactionThreadCount <= 0) {
- LOGGER.error("compaction_thread_count must greater than 0");
- return false;
+ newConfigCompactionThreadCount = 1;
}
if (newConfigCompactionThreadCount == conf.getCompactionThreadCount()) {
return false;
@@ -1448,8 +1455,7 @@ public class IoTDBDescriptor {
ConfigurationFileUtils.getConfigurationDefaultValue(
"sub_compaction_thread_count")));
if (newConfigSubtaskNum <= 0) {
- LOGGER.error("sub_compaction_thread_count must greater than 0");
- return false;
+ newConfigSubtaskNum = 1;
}
if (newConfigSubtaskNum == conf.getSubCompactionTaskNum()) {
return false;
@@ -1458,7 +1464,8 @@ public class IoTDBDescriptor {
return true;
}
- private void loadCompactionIsEnabledHotModifiedProps(Properties properties)
throws IOException {
+ private boolean loadCompactionIsEnabledHotModifiedProps(Properties
properties)
+ throws IOException {
boolean isCompactionEnabled =
conf.isEnableSeqSpaceCompaction()
|| conf.isEnableUnseqSpaceCompaction()
@@ -1486,14 +1493,10 @@ public class IoTDBDescriptor {
|| newConfigEnableSeqSpaceCompaction
|| newConfigEnableUnseqSpaceCompaction;
- if (!isCompactionEnabled && compactionEnabledInNewConfig) {
- LOGGER.error("Compaction cannot start in current status.");
- return;
- }
-
conf.setEnableCrossSpaceCompaction(newConfigEnableCrossSpaceCompaction);
conf.setEnableSeqSpaceCompaction(newConfigEnableSeqSpaceCompaction);
conf.setEnableUnseqSpaceCompaction(newConfigEnableUnseqSpaceCompaction);
+ return !isCompactionEnabled && compactionEnabledInNewConfig;
}
private void loadWALHotModifiedProps(Properties properties) throws
IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index b3544644bf4..34dde7d0b30 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -70,7 +70,6 @@ import
org.apache.iotdb.consensus.exception.ConsensusException;
import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.db.auth.AuthorityChecker;
-import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
@@ -148,6 +147,7 @@ import org.apache.iotdb.db.service.metrics.FileMetrics;
import org.apache.iotdb.db.storageengine.StorageEngine;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.settle.SettleRequestHandler;
import
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
import
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager;
@@ -1806,11 +1806,10 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
if (!storageEngine.isReadyForNonReadWriteFunctions()) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "not all
sg is ready");
}
- IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
- if (!iotdbConfig.isEnableSeqSpaceCompaction() ||
!iotdbConfig.isEnableUnseqSpaceCompaction()) {
+ if (!CompactionTaskManager.getInstance().isInit()) {
return RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR,
- "cannot start repair task because inner space compaction is not
enabled");
+ "cannot start repair task because compaction is not enabled");
}
try {
if (storageEngine.repairData()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index b4f11e0a18d..da95ed794db 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -118,7 +118,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
-import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -231,6 +230,7 @@ import
org.apache.iotdb.db.schemaengine.template.alter.TemplateExtendInfo;
import org.apache.iotdb.db.storageengine.StorageEngine;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -1117,12 +1117,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
"not all sg is ready",
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
return future;
}
- IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
- if (!iotdbConfig.isEnableSeqSpaceCompaction()
- || !iotdbConfig.isEnableUnseqSpaceCompaction()) {
+ if (!CompactionTaskManager.getInstance().isInit()) {
future.setException(
new IoTDBException(
- "cannot start repair task because inner space compaction is
not enabled",
+ "cannot start repair task because compaction is not enabled",
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
return future;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 7fd37e67587..98855a87d83 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -677,11 +677,6 @@ public class DataRegion implements IDataRegionForQuery {
}
public void initCompactionSchedule() {
- if (!config.isEnableSeqSpaceCompaction()
- && !config.isEnableUnseqSpaceCompaction()
- && !config.isEnableCrossSpaceCompaction()) {
- return;
- }
RepairUnsortedFileCompactionTask.recoverAllocatedFileTimestamp(
tsFileManager.getMaxFileTimestampOfUnSequenceFile());
CompactionScheduleTaskManager.getInstance().registerDataRegion(this);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
index 3c9021f29c6..bd0c01b6dc2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
@@ -118,11 +118,7 @@ public class CompactionTaskManager implements IService {
@Override
public synchronized void start() {
- if (taskExecutionPool == null
- &&
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() > 0
- && (config.isEnableSeqSpaceCompaction()
- || config.isEnableUnseqSpaceCompaction()
- || config.isEnableCrossSpaceCompaction())) {
+ if (!init) {
initThreadPool();
candidateCompactionTaskQueue.regsitPollLastHook(
AbstractCompactionTask::resetCompactionCandidateStatusForAllSourceFiles);
@@ -132,6 +128,10 @@ public class CompactionTaskManager implements IService {
logger.info("Compaction task manager started.");
}
+ public boolean isInit() {
+ return this.init;
+ }
+
private void initThreadPool() {
int compactionThreadNum =
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount();
this.taskExecutionPool =
@@ -222,6 +222,7 @@ public class CompactionTaskManager implements IService {
}
taskExecutionPool = null;
subCompactionTaskExecutionPool = null;
+ init = false;
storageGroupTasks.clear();
logger.info("CompactionManager stopped");
}