This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch DEBUG-WRITE-FLUCTUATION
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/DEBUG-WRITE-FLUCTUATION by
this push:
new c4bf152c14 add io rate limit for compaction
c4bf152c14 is described below
commit c4bf152c1467443f6ecc65f74ccfc238b3ec3b8b
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Dec 6 16:32:46 2022 +0800
add io rate limit for compaction
---
.../src/assembly/resources/conf/iotdb-common.properties | 2 +-
.../src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 ++++++------
.../main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java | 9 ++++-----
.../iotdb/db/engine/compaction/CompactionTaskManager.java | 13 ++++++-------
.../inner/utils/AlignedSeriesCompactionExecutor.java | 11 ++++++-----
.../inner/utils/SingleSeriesCompactionExecutor.java | 8 ++++----
.../engine/compaction/writer/AbstractCompactionWriter.java | 12 +++++-------
.../engine/compaction/utils/CompactionConfigRestorer.java | 2 +-
8 files changed, 33 insertions(+), 36 deletions(-)
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index d932a8e9f3..3b07e81b5c 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -621,7 +621,7 @@
# The limit of write throughput merge can reach per second
# Datatype: int
-# compaction_write_throughput_mb_per_sec=16
+# compaction_io_per_sec=16
# The number of sub compaction threads to be set up to perform compaction.
# Currently only works for nonAligned data in cross space compaction and unseq
inner space compaction.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 99dcf1b865..0c2d469817 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -658,8 +658,8 @@ public class IoTDBConfig {
*/
private long mergeIntervalSec = 0L;
- /** The limit of compaction merge can reach per second */
- private int compactionWriteThroughputMbPerSec = 16;
+ /** The limit of compaction IO times per second */
+ private int compactionIOPerSec = 16;
/**
* How many thread will be set up to perform compaction, 10 by default. Set
to 1 when less than or
@@ -1913,12 +1913,12 @@ public class IoTDBConfig {
this.intoOperationExecutionThreadCount = intoOperationExecutionThreadCount;
}
- public int getCompactionWriteThroughputMbPerSec() {
- return compactionWriteThroughputMbPerSec;
+ public int getCompactionIOPerSec() {
+ return compactionIOPerSec;
}
- public void setCompactionWriteThroughputMbPerSec(int
compactionWriteThroughputMbPerSec) {
- this.compactionWriteThroughputMbPerSec = compactionWriteThroughputMbPerSec;
+ public void setCompactionIOPerSec(int compactionIOPerSec) {
+ this.compactionIOPerSec = compactionIOPerSec;
}
public boolean isEnableMemControl() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 57a0aee054..731713ee07 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -667,11 +667,10 @@ public class IoTDBDescriptor {
"max_cross_compaction_candidate_file_size",
Long.toString(conf.getMaxCrossCompactionCandidateFileSize()))));
- conf.setCompactionWriteThroughputMbPerSec(
+ conf.setCompactionIOPerSec(
Integer.parseInt(
properties.getProperty(
- "compaction_write_throughput_mb_per_sec",
-
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+ "compaction_io_per_sec",
Integer.toString(conf.getCompactionIOPerSec()))));
conf.setEnableCompactionValidation(
Boolean.parseBoolean(
@@ -1475,11 +1474,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"slow_query_threshold",
Long.toString(conf.getSlowQueryThreshold()))));
// update merge_write_throughput_mb_per_sec
- conf.setCompactionWriteThroughputMbPerSec(
+ conf.setCompactionIOPerSec(
Integer.parseInt(
properties.getProperty(
"merge_write_throughput_mb_per_sec",
-
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+ Integer.toString(conf.getCompactionIOPerSec()))));
// update insert-tablet-plan's row limit for select-into
conf.setSelectIntoInsertTabletPlanRowLimit(
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index edd79a608c..5f74773c0c 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -75,7 +75,7 @@ public class CompactionTaskManager implements IService {
storageGroupTasks = new ConcurrentHashMap<>();
private final AtomicInteger finishedTaskNum = new AtomicInteger(0);
- private final RateLimiter mergeWriteRateLimiter =
RateLimiter.create(Double.MAX_VALUE);
+ private final RateLimiter compactionIORateLimiter =
RateLimiter.create(Double.MAX_VALUE);
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private volatile boolean init = false;
@@ -237,10 +237,9 @@ public class CompactionTaskManager implements IService {
.containsKey(task);
}
- public RateLimiter getMergeWriteRateLimiter() {
- setWriteMergeRate(
-
IoTDBDescriptor.getInstance().getConfig().getCompactionWriteThroughputMbPerSec());
- return mergeWriteRateLimiter;
+ public RateLimiter getCompactionIORateLimiter() {
+
setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getCompactionIOPerSec());
+ return compactionIORateLimiter;
}
private void setWriteMergeRate(final double throughoutMbPerSec) {
@@ -249,8 +248,8 @@ public class CompactionTaskManager implements IService {
if (throughout == 0) {
throughout = Double.MAX_VALUE;
}
- if (mergeWriteRateLimiter.getRate() != throughout) {
- mergeWriteRateLimiter.setRate(throughout);
+ if (compactionIORateLimiter.getRate() != throughout) {
+ compactionIORateLimiter.setRate(throughout);
}
}
/** wait by throughoutMbPerSec limit to avoid continuous Write Or Read */
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
index 3de4c64a36..b09fdab0b5 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
@@ -62,7 +62,7 @@ public class AlignedSeriesCompactionExecutor {
private final List<IMeasurementSchema> schemaList;
private long remainingPointInChunkWriter = 0L;
private final RateLimiter rateLimiter =
- CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
+ CompactionTaskManager.getInstance().getCompactionIORateLimiter();
private final long chunkSizeThreshold =
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -135,6 +135,9 @@ public class AlignedSeriesCompactionExecutor {
TsFileAlignedSeriesReaderIterator readerIterator =
new TsFileAlignedSeriesReaderIterator(reader,
alignedChunkMetadataList, schemaList);
while (readerIterator.hasNext()) {
+ CompactionTaskManager.getInstance()
+ .getCompactionIORateLimiter()
+ .acquire(schemaList.size() + 1);
Pair<AlignedChunkReader, Long> chunkReaderAndChunkSize =
readerIterator.nextReader();
CompactionMetricsRecorder.recordReadInfo(chunkReaderAndChunkSize.right);
compactOneAlignedChunk(chunkReaderAndChunkSize.left);
@@ -142,8 +145,7 @@ public class AlignedSeriesCompactionExecutor {
}
if (remainingPointInChunkWriter != 0L) {
- CompactionTaskManager.mergeRateLimiterAcquire(
- rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
+ CompactionTaskManager.mergeRateLimiterAcquire(rateLimiter, 1);
CompactionMetricsRecorder.recordWriteInfo(
CompactionType.INNER_SEQ_COMPACTION,
ProcessChunkType.DESERIALIZE_CHUNK,
@@ -181,8 +183,7 @@ public class AlignedSeriesCompactionExecutor {
private void flushChunkWriterIfLargeEnough() throws IOException {
if (remainingPointInChunkWriter >= chunkPointNumThreshold
|| chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold *
schemaList.size()) {
- CompactionTaskManager.mergeRateLimiterAcquire(
- rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
+ CompactionTaskManager.mergeRateLimiterAcquire(rateLimiter, 1);
CompactionMetricsRecorder.recordWriteInfo(
CompactionType.INNER_SEQ_COMPACTION,
ProcessChunkType.DESERIALIZE_CHUNK,
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index d1d4a366e7..784b15e3cb 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -58,7 +58,7 @@ public class SingleSeriesCompactionExecutor {
private Chunk cachedChunk;
private ChunkMetadata cachedChunkMetadata;
private RateLimiter compactionRateLimiter =
- CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
+ CompactionTaskManager.getInstance().getCompactionIORateLimiter();
// record the min time and max time to update the target resource
private long minStartTimestamp = Long.MAX_VALUE;
private long maxEndTimestamp = Long.MIN_VALUE;
@@ -117,6 +117,7 @@ public class SingleSeriesCompactionExecutor {
TsFileSequenceReader reader = readerListPair.left;
List<ChunkMetadata> chunkMetadataList = readerListPair.right;
for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+ compactionRateLimiter.acquire(1);
Chunk currentChunk = reader.readMemChunk(chunkMetadata);
if (this.chunkWriter == null) {
constructChunkWriterFromReadChunk(currentChunk);
@@ -298,7 +299,7 @@ public class SingleSeriesCompactionExecutor {
private void flushChunkToFileWriter(
Chunk chunk, ChunkMetadata chunkMetadata, boolean isCachedChunk) throws
IOException {
- CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter,
getChunkSize(chunk));
+ CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 1);
if (chunkMetadata.getStartTime() < minStartTimestamp) {
minStartTimestamp = chunkMetadata.getStartTime();
}
@@ -316,8 +317,7 @@ public class SingleSeriesCompactionExecutor {
private void flushChunkWriterIfLargeEnough() throws IOException {
if (pointCountInChunkWriter >= targetChunkPointNum
|| chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) {
- CompactionTaskManager.mergeRateLimiterAcquire(
- compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
+ CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 1);
CompactionMetricsRecorder.recordWriteInfo(
CompactionType.INNER_SEQ_COMPACTION,
ProcessChunkType.DESERIALIZE_CHUNK,
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
index 24a82136ce..c2f54080e4 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
@@ -49,7 +49,7 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
protected int subTaskNum =
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
private RateLimiter compactionRateLimiter =
- CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
+ CompactionTaskManager.getInstance().getCompactionIORateLimiter();
// check if there is unseq error point during writing
protected long[] lastTime = new long[subTaskNum];
@@ -162,8 +162,7 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
protected void sealChunk(TsFileIOWriter targetWriter, IChunkWriter
iChunkWriter, int subTaskId)
throws IOException {
- CompactionTaskManager.mergeRateLimiterAcquire(
- compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize());
+ CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 1);
synchronized (targetWriter) {
iChunkWriter.writeToFileWriter(targetWriter);
}
@@ -184,7 +183,7 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
protected void flushNonAlignedChunkToFileWriter(
TsFileIOWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata,
int subTaskId)
throws IOException {
- CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter,
getChunkSize(chunk));
+ CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 1);
synchronized (targetWriter) {
// seal last chunk to file writer
chunkWriters[subTaskId].writeToFileWriter(targetWriter);
@@ -208,7 +207,7 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
chunkPointNumArray[subTaskId] = 0;
// flush time chunk
- CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter,
getChunkSize(timeChunk));
+ CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 1);
targetWriter.writeChunk(timeChunk, (ChunkMetadata) timeChunkMetadata);
// flush value chunks
@@ -225,8 +224,7 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
valueChunkWriter.getStatistics());
continue;
}
- CompactionTaskManager.mergeRateLimiterAcquire(
- compactionRateLimiter, getChunkSize(valueChunk));
+ CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter,
1);
targetWriter.writeChunk(valueChunk, (ChunkMetadata)
valueChunkMetadatas.get(i));
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
index a809d10262..a9194d2fa1 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
@@ -65,6 +65,6 @@ public class CompactionConfigRestorer {
config.setCompactionThreadCount(concurrentCompactionThread);
config.setCompactionScheduleIntervalInMs(compactionScheduleIntervalInMs);
config.setCompactionSubmissionIntervalInMs(compactionSubmissionIntervalInMs);
-
config.setCompactionWriteThroughputMbPerSec(compactionWriteThroughputMbPerSec);
+ config.setCompactionIOPerSec(compactionWriteThroughputMbPerSec);
}
}