This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b9e9aaa30e4 Compaction rate limit (#12312)
b9e9aaa30e4 is described below
commit b9e9aaa30e4c082553b56acd870853d62d47638a
Author: shuwenwei <[email protected]>
AuthorDate: Fri Apr 19 17:27:05 2024 +0800
Compaction rate limit (#12312)
* add compaction read rate limiter
* modify method name
* modify default compaction read rate limit
* add config item to limit compaction read speed
* add config item to limit compaction read speed
* fix throughput rate is not correct
* fix some issues
* compaction read rate has no limit in default config
* use correct unit when init rate limiter
* modify maxSizePerWrite
* remove useless code
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 27 +++++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 48 ++++++++--
.../compaction/io/CompactionTsFileOutput.java | 104 +++++++++++++++++++++
.../compaction/io/CompactionTsFileReader.java | 7 ++
.../compaction/io/CompactionTsFileWriter.java | 22 +----
.../compaction/repair/RepairDataFileScanUtil.java | 9 +-
.../compaction/schedule/CompactionTaskManager.java | 49 ++++++++--
.../resources/conf/iotdb-common.properties | 12 ++-
8 files changed, 240 insertions(+), 38 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e7493770f2e..ac8a2ade808 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -684,9 +684,18 @@ public class IoTDBConfig {
*/
private long mergeIntervalSec = 0L;
- /** The limit of compaction merge can reach per second */
+ /** The limit of compaction merge can reach per second. When <= 0, no limit.
unit: megabyte */
private int compactionWriteThroughputMbPerSec = 16;
+ /**
+ * The limit of compaction read throughput can reach per second. When <= 0,
no limit. unit:
+ * megabyte
+ */
+ private int compactionReadThroughputMbPerSec = 0;
+
+ /** The limit of compaction read operation can reach per second. When <= 0,
no limit. */
+ private int compactionReadOperationPerSec = 0;
+
/**
* How many thread will be set up to perform compaction, 10 by default. Set
to 1 when less than or
* equal to 0.
@@ -2046,6 +2055,22 @@ public class IoTDBConfig {
this.compactionWriteThroughputMbPerSec = compactionWriteThroughputMbPerSec;
}
+ public int getCompactionReadThroughputMbPerSec() {
+ return compactionReadThroughputMbPerSec;
+ }
+
+ public void setCompactionReadThroughputMbPerSec(int
compactionReadThroughputMbPerSec) {
+ this.compactionReadThroughputMbPerSec = compactionReadThroughputMbPerSec;
+ }
+
+ public int getCompactionReadOperationPerSec() {
+ return compactionReadOperationPerSec;
+ }
+
+ public void setCompactionReadOperationPerSec(int
compactionReadOperationPerSec) {
+ this.compactionReadOperationPerSec = compactionReadOperationPerSec;
+ }
+
public boolean isEnableTimedFlushSeqMemtable() {
return enableTimedFlushSeqMemtable;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index df797ff1afe..f5cf2d8fc5e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -669,6 +669,18 @@ public class IoTDBDescriptor {
"compaction_write_throughput_mb_per_sec",
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+ conf.setCompactionReadThroughputMbPerSec(
+ Integer.parseInt(
+ properties.getProperty(
+ "compaction_read_throughput_mb_per_sec",
+
Integer.toString(conf.getCompactionReadThroughputMbPerSec()))));
+
+ conf.setCompactionReadOperationPerSec(
+ Integer.parseInt(
+ properties.getProperty(
+ "compaction_read_operation_per_sec",
+ Integer.toString(conf.getCompactionReadOperationPerSec()))));
+
conf.setEnableTsFileValidation(
Boolean.parseBoolean(
properties.getProperty(
@@ -1137,6 +1149,35 @@ public class IoTDBDescriptor {
if (restartCompactionTaskManager) {
CompactionTaskManager.getInstance().restart();
}
+ // hot load compaction rate limit configurations
+
+ // update merge_write_throughput_mb_per_sec
+ conf.setCompactionWriteThroughputMbPerSec(
+ Integer.parseInt(
+ properties.getProperty(
+ "merge_write_throughput_mb_per_sec",
+
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+
+ // update compaction_read_operation_per_sec
+ conf.setCompactionReadOperationPerSec(
+ Integer.parseInt(
+ properties.getProperty(
+ "compaction_read_operation_per_sec",
+ Integer.toString(conf.getCompactionReadOperationPerSec()))));
+
+ // update compaction_read_throughput_mb_per_sec
+ conf.setCompactionReadThroughputMbPerSec(
+ Integer.parseInt(
+ properties.getProperty(
+ "compaction_read_throughput_mb_per_sec",
+
Integer.toString(conf.getCompactionReadThroughputMbPerSec()))));
+
+ CompactionTaskManager.getInstance()
+
.setCompactionReadOperationRate(conf.getCompactionReadOperationPerSec());
+ CompactionTaskManager.getInstance()
+
.setCompactionReadThroughputRate(conf.getCompactionReadThroughputMbPerSec());
+ CompactionTaskManager.getInstance()
+ .setWriteMergeRate(conf.getCompactionWriteThroughputMbPerSec());
}
private boolean loadCompactionThreadCountHotModifiedProps(Properties
properties) {
@@ -1616,13 +1657,6 @@ public class IoTDBDescriptor {
Long.parseLong(
properties.getProperty(
"slow_query_threshold",
Long.toString(conf.getSlowQueryThreshold()))));
- // update merge_write_throughput_mb_per_sec
- conf.setCompactionWriteThroughputMbPerSec(
- Integer.parseInt(
- properties.getProperty(
- "merge_write_throughput_mb_per_sec",
-
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
-
// update select into operation max buffer size
conf.setIntoOperationBufferSizeInByte(
Long.parseLong(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileOutput.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileOutput.java
new file mode 100644
index 00000000000..3992b6ab284
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileOutput.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.io;
+
+import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class CompactionTsFileOutput extends OutputStream implements
TsFileOutput {
+
+ private TsFileOutput output;
+ private RateLimiter rateLimiter;
+ private final int maxSizePerWrite;
+
+ public CompactionTsFileOutput(TsFileOutput output, RateLimiter rateLimiter) {
+ this.output = output;
+ this.rateLimiter = rateLimiter;
+ this.maxSizePerWrite = (int) Math.min((long) rateLimiter.getRate(),
Integer.MAX_VALUE);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ rateLimiter.acquire(1);
+ output.wrapAsStream().write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte b) throws IOException {
+ rateLimiter.acquire(1);
+ output.write(b);
+ }
+
+ @Override
+ public void write(ByteBuffer b) throws IOException {
+ write(b.array());
+ }
+
+ @Override
+ public long getPosition() throws IOException {
+ return output.getPosition();
+ }
+
+ @Override
+ public void close() throws IOException {
+ output.close();
+ }
+
+ @Override
+ public OutputStream wrapAsStream() {
+ return this;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ output.flush();
+ }
+
+ @Override
+ public void truncate(long size) throws IOException {
+ output.truncate(size);
+ }
+
+ @Override
+ public void force() throws IOException {
+ output.force();
+ }
+
+ @Override
+ public void write(byte[] buf, int start, int length) throws IOException {
+ while (length > 0) {
+ int writeSize = Math.min(length, maxSizePerWrite);
+ rateLimiter.acquire(writeSize);
+ output.wrapAsStream().write(buf, start, writeSize);
+ start += writeSize;
+ length -= writeSize;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
index f4231d378de..ffb6f5ed921 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.io;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionIoDataType;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.tsfile.file.IMetadataIndexEntry;
@@ -75,6 +76,7 @@ public class CompactionTsFileReader extends
TsFileSequenceReader {
@Override
protected ByteBuffer readData(long position, int totalSize) throws
IOException {
+ acquireReadDataSizeWithCompactionReadRateLimiter(totalSize);
ByteBuffer buffer = super.readData(position, totalSize);
readDataSize.addAndGet(totalSize);
return buffer;
@@ -248,6 +250,11 @@ public class CompactionTsFileReader extends
TsFileSequenceReader {
.recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
}
+ private void acquireReadDataSizeWithCompactionReadRateLimiter(int
readDataSize) {
+
CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1);
+
CompactionTaskManager.getInstance().getCompactionReadRateLimiter().acquire(readDataSize);
+ }
+
@Override
public boolean equals(Object o) {
return super.equals(o);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
index 6783d84bfe5..a2388e72aaa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
@@ -47,6 +47,9 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
throws IOException {
super(file, maxMetadataSize);
this.type = type;
+ super.out =
+ new CompactionTsFileOutput(
+ super.out,
CompactionTaskManager.getInstance().getMergeWriteRateLimiter());
}
public void markStartingWritingAligned() {
@@ -65,7 +68,6 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
}
chunkWriter.writeToFileWriter(this);
long writtenDataSize = this.getPos() - beforeOffset;
- acquireWrittenDataSizeWithCompactionWriteRateLimiter(writtenDataSize);
CompactionMetrics.getInstance()
.recordWriteInfo(
type,
@@ -81,7 +83,6 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
}
super.writeChunk(chunk, chunkMetadata);
long writtenDataSize = this.getPos() - beforeOffset;
- acquireWrittenDataSizeWithCompactionWriteRateLimiter(writtenDataSize);
CompactionMetrics.getInstance()
.recordWriteInfo(
type,
@@ -103,13 +104,11 @@ public class CompactionTsFileWriter extends
TsFileIOWriter {
long writtenDataSize = this.getPos() - beforeOffset;
CompactionMetrics.getInstance()
.recordWriteInfo(type, CompactionIoDataType.ALIGNED, writtenDataSize);
- acquireWrittenDataSizeWithCompactionWriteRateLimiter(writtenDataSize);
}
@Override
public int checkMetadataSizeAndMayFlush() throws IOException {
int size = super.checkMetadataSizeAndMayFlush();
- acquireWrittenDataSizeWithCompactionWriteRateLimiter(size);
CompactionMetrics.getInstance().recordWriteInfo(type,
CompactionIoDataType.METADATA, size);
return size;
}
@@ -119,25 +118,10 @@ public class CompactionTsFileWriter extends
TsFileIOWriter {
long beforeSize = this.getPos();
super.endFile();
long writtenDataSize = this.getPos() - beforeSize;
- acquireWrittenDataSizeWithCompactionWriteRateLimiter(writtenDataSize);
CompactionMetrics.getInstance()
.recordWriteInfo(type, CompactionIoDataType.METADATA, writtenDataSize);
}
- private void acquireWrittenDataSizeWithCompactionWriteRateLimiter(long
writtenDataSize) {
- while (writtenDataSize > 0) {
- if (writtenDataSize > Integer.MAX_VALUE) {
-
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire(Integer.MAX_VALUE);
- writtenDataSize -= Integer.MAX_VALUE;
- } else {
- CompactionTaskManager.getInstance()
- .getMergeWriteRateLimiter()
- .acquire((int) writtenDataSize);
- return;
- }
- }
- }
-
public boolean isEmptyTargetFile() {
return isEmptyTargetFile;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
index b6a0dd5033d..96ca52d22fc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.repair;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionChunkReader;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
@@ -73,7 +75,12 @@ public class RepairDataFileScanUtil {
public void scanTsFile() {
File tsfile = resource.getTsFile();
- try (TsFileSequenceReader reader = new
TsFileSequenceReader(tsfile.getPath())) {
+ try (TsFileSequenceReader reader =
+ new CompactionTsFileReader(
+ tsfile.getPath(),
+ resource.isSeq()
+ ? CompactionType.INNER_SEQ_COMPACTION
+ : CompactionType.INNER_UNSEQ_COMPACTION)) {
TsFileDeviceIterator deviceIterator =
reader.getAllDevicesIteratorWithIsAligned();
while (deviceIterator.hasNext()) {
Pair<IDeviceID, Boolean> deviceIsAlignedPair = deviceIterator.next();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
index aa3073bdce7..fbe9970f933 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
@@ -83,7 +83,21 @@ public class CompactionTaskManager implements IService {
storageGroupTasks = new ConcurrentHashMap<>();
private final AtomicInteger finishedTaskNum = new AtomicInteger(0);
- private final RateLimiter mergeWriteRateLimiter =
RateLimiter.create(Double.MAX_VALUE);
+ private final RateLimiter mergeWriteRateLimiter =
+ RateLimiter.create(
+ config.getCompactionWriteThroughputMbPerSec() <= 0
+ ? Double.MAX_VALUE
+ : config.getCompactionWriteThroughputMbPerSec() * 1024.0 *
1024.0);
+ private final RateLimiter compactionReadOperationRateLimiter =
+ RateLimiter.create(
+ config.getCompactionReadOperationPerSec() <= 0
+ ? Double.MAX_VALUE
+ : config.getCompactionReadOperationPerSec());
+ private final RateLimiter compactionReadThroughputRateLimiter =
+ RateLimiter.create(
+ config.getCompactionReadThroughputMbPerSec() <= 0
+ ? Double.MAX_VALUE
+ : config.getCompactionReadThroughputMbPerSec() * 1024.0 *
1024.0);
private volatile boolean init = false;
@@ -250,19 +264,36 @@ public class CompactionTaskManager implements IService {
}
public RateLimiter getMergeWriteRateLimiter() {
- setWriteMergeRate(
-
IoTDBDescriptor.getInstance().getConfig().getCompactionWriteThroughputMbPerSec());
return mergeWriteRateLimiter;
}
- private void setWriteMergeRate(final double throughoutMbPerSec) {
- double throughout = throughoutMbPerSec * 1024.0 * 1024.0;
+ public RateLimiter getCompactionReadRateLimiter() {
+ return compactionReadThroughputRateLimiter;
+ }
+
+ public RateLimiter getCompactionReadOperationRateLimiter() {
+ return compactionReadOperationRateLimiter;
+ }
+
+ public void setWriteMergeRate(final double throughoutMbPerSec) {
+ setRate(mergeWriteRateLimiter, throughoutMbPerSec * 1024.0 * 1024.0);
+ }
+
+ public void setCompactionReadOperationRate(final double readOperationPerSec)
{
+ setRate(compactionReadOperationRateLimiter, readOperationPerSec);
+ }
+
+ public void setCompactionReadThroughputRate(final double throughputMbPerSec)
{
+ setRate(compactionReadThroughputRateLimiter, throughputMbPerSec * 1024.0 *
1024.0);
+ }
+
+ private void setRate(RateLimiter rateLimiter, double rate) {
// if throughout = 0, disable rate limiting
- if (throughout <= 0) {
- throughout = Double.MAX_VALUE;
+ if (rate <= 0) {
+ rate = Double.MAX_VALUE;
}
- if (mergeWriteRateLimiter.getRate() != throughout) {
- mergeWriteRateLimiter.setRate(throughout);
+ if (Math.abs(rateLimiter.getRate() - rate) > 0.0001) {
+ rateLimiter.setRate(rate);
}
}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 6b98d0906c9..9f234a7f0ca 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -646,9 +646,19 @@ data_replication_factor=1
# The limit of write throughput merge can reach per second
# values less than or equal to 0 means no limit
-# Datatype: int
+# Datatype: int, Unit: megabyte
# compaction_write_throughput_mb_per_sec=16
+# The limit of read throughput merge can reach per second
+# values less than or equal to 0 means no limit
+# Datatype: int, Unit: megabyte
+# compaction_read_throughput_mb_per_sec=0
+
+# The limit of read operation merge can reach per second
+# values less than or equal to 0 means no limit
+# Datatype: int
+# compaction_read_operation_per_sec=0
+
# The number of sub compaction threads to be set up to perform compaction.
# Currently only works for nonAligned data in cross space compaction and unseq
inner space compaction.
# Set to 1 when less than or equal to 0.