This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-5209 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8780d73d835be6c300860facc2e0fa33bd527dbe Author: Liu Xuxin <[email protected]> AuthorDate: Wed Dec 14 16:22:48 2022 +0800 change write limit to io limit --- .../src/assembly/resources/conf/iotdb-common.properties | 4 ++-- .../src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 ++++++------ .../main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java | 11 +++++------ .../iotdb/db/engine/compaction/CompactionTaskManager.java | 9 ++++----- .../inner/utils/AlignedSeriesCompactionExecutor.java | 9 ++++----- .../inner/utils/SingleSeriesCompactionExecutor.java | 11 +++++------ .../engine/compaction/writer/AbstractCompactionWriter.java | 12 +++++++----- .../db/engine/compaction/utils/CompactionConfigRestorer.java | 2 +- 8 files changed, 34 insertions(+), 36 deletions(-) diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties index d932a8e9f3..d0302b5c77 100644 --- a/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties @@ -619,9 +619,9 @@ # Datatype: long, Unit: ms # compaction_submission_interval_in_ms=60000 -# The limit of write throughput merge can reach per second +# The limit of io rate can reach per second # Datatype: int -# compaction_write_throughput_mb_per_sec=16 +# compaction_io_rate_per_sec=50 # The number of sub compaction threads to be set up to perform compaction. # Currently only works for nonAligned data in cross space compaction and unseq inner space compaction. diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index bf77df1972..5575fe9495 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -658,8 +658,8 @@ public class IoTDBConfig { */ private long mergeIntervalSec = 0L; - /** The limit of compaction merge can reach per second */ - private int compactionWriteThroughputMbPerSec = 16; + /** The limit of io rate can reach per second */ + private int compactionIORatePerSec = 50; /** * How many thread will be set up to perform compaction, 10 by default. Set to 1 when less than or @@ -1913,12 +1913,12 @@ public class IoTDBConfig { this.intoOperationExecutionThreadCount = intoOperationExecutionThreadCount; } - public int getCompactionWriteThroughputMbPerSec() { - return compactionWriteThroughputMbPerSec; + public int getCompactionIORatePerSec() { + return compactionIORatePerSec; } - public void setCompactionWriteThroughputMbPerSec(int compactionWriteThroughputMbPerSec) { - this.compactionWriteThroughputMbPerSec = compactionWriteThroughputMbPerSec; + public void setCompactionIORatePerSec(int compactionIORatePerSec) { + this.compactionIORatePerSec = compactionIORatePerSec; } public boolean isEnableMemControl() { diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 596da574af..37f44e2f41 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -655,11 +655,10 @@ public class IoTDBDescriptor { "max_cross_compaction_candidate_file_size", Long.toString(conf.getMaxCrossCompactionCandidateFileSize())))); - conf.setCompactionWriteThroughputMbPerSec( + conf.setCompactionIORatePerSec( Integer.parseInt( properties.getProperty( - "compaction_write_throughput_mb_per_sec", - Integer.toString(conf.getCompactionWriteThroughputMbPerSec())))); + "compaction_io_rate_per_sec", Integer.toString(conf.getCompactionIORatePerSec())))); conf.setEnableCompactionValidation( Boolean.parseBoolean( @@ -1436,11 +1435,11 @@ public class IoTDBDescriptor { properties.getProperty( "slow_query_threshold", Long.toString(conf.getSlowQueryThreshold())))); // update merge_write_throughput_mb_per_sec - conf.setCompactionWriteThroughputMbPerSec( + conf.setCompactionIORatePerSec( Integer.parseInt( properties.getProperty( - "merge_write_throughput_mb_per_sec", - Integer.toString(conf.getCompactionWriteThroughputMbPerSec())))); + "compaction_io_rate_per_sec", + Integer.toString(conf.getCompactionIORatePerSec())))); // update insert-tablet-plan's row limit for select-into conf.setSelectIntoInsertTabletPlanRowLimit( diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java index ea7e70805f..e6201b448b 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java @@ -240,9 +240,8 @@ public class CompactionTaskManager implements IService { .containsKey(task); } - public RateLimiter getMergeWriteRateLimiter() { - setWriteMergeRate( - IoTDBDescriptor.getInstance().getConfig().getCompactionWriteThroughputMbPerSec()); + public RateLimiter getCompactionIORateLimiter() { + setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getCompactionIORatePerSec()); return mergeWriteRateLimiter; } @@ -256,8 +255,8 @@ public class CompactionTaskManager implements IService { mergeWriteRateLimiter.setRate(throughout); } } - /** wait by throughoutMbPerSec limit to avoid continuous Write Or Read */ - public static void mergeRateLimiterAcquire(RateLimiter limiter, long bytesLength) { + /** wait by compactionIORatePerSec limit to avoid continuous Write Or Read */ + public static void compactionIORateLimiterAcquire(RateLimiter limiter, long bytesLength) { while (bytesLength >= Integer.MAX_VALUE) { limiter.acquire(Integer.MAX_VALUE); bytesLength -= Integer.MAX_VALUE; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java index 3de4c64a36..8d1a94a093 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java @@ -62,7 +62,7 @@ public class AlignedSeriesCompactionExecutor { private final List<IMeasurementSchema> schemaList; private long remainingPointInChunkWriter = 0L; private final RateLimiter rateLimiter = - CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); + CompactionTaskManager.getInstance().getCompactionIORateLimiter(); private final long chunkSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize(); @@ -137,13 +137,13 @@ public class AlignedSeriesCompactionExecutor { while (readerIterator.hasNext()) { Pair<AlignedChunkReader, Long> chunkReaderAndChunkSize = readerIterator.nextReader(); CompactionMetricsRecorder.recordReadInfo(chunkReaderAndChunkSize.right); + rateLimiter.acquire(schemaList.size() + 1); compactOneAlignedChunk(chunkReaderAndChunkSize.left); } } if (remainingPointInChunkWriter != 0L) { - CompactionTaskManager.mergeRateLimiterAcquire( - rateLimiter, chunkWriter.estimateMaxSeriesMemSize()); + rateLimiter.acquire(chunkWriter.getValueChunkWriterList().size() + 1); CompactionMetricsRecorder.recordWriteInfo( CompactionType.INNER_SEQ_COMPACTION, ProcessChunkType.DESERIALIZE_CHUNK, @@ -181,8 +181,7 @@ public class AlignedSeriesCompactionExecutor { private void flushChunkWriterIfLargeEnough() throws IOException { if (remainingPointInChunkWriter >= chunkPointNumThreshold || chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * schemaList.size()) { - CompactionTaskManager.mergeRateLimiterAcquire( - rateLimiter, chunkWriter.estimateMaxSeriesMemSize()); + rateLimiter.acquire(chunkWriter.getValueChunkWriterList().size() + 1); CompactionMetricsRecorder.recordWriteInfo( CompactionType.INNER_SEQ_COMPACTION, ProcessChunkType.DESERIALIZE_CHUNK, diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java index d1d4a366e7..740656bcaa 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java @@ -58,7 +58,7 @@ public class SingleSeriesCompactionExecutor { private Chunk cachedChunk; private ChunkMetadata cachedChunkMetadata; private RateLimiter compactionRateLimiter = - CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); + CompactionTaskManager.getInstance().getCompactionIORateLimiter(); // record the min time and max time to update the target resource private long minStartTimestamp = Long.MAX_VALUE; private long maxEndTimestamp = Long.MIN_VALUE; @@ -117,6 +117,7 @@ public class SingleSeriesCompactionExecutor { TsFileSequenceReader reader = readerListPair.left; List<ChunkMetadata> chunkMetadataList = readerListPair.right; for (ChunkMetadata chunkMetadata : chunkMetadataList) { + compactionRateLimiter.acquire(1); Chunk currentChunk = reader.readMemChunk(chunkMetadata); if (this.chunkWriter == null) { constructChunkWriterFromReadChunk(currentChunk); @@ -298,7 +299,7 @@ public class SingleSeriesCompactionExecutor { private void flushChunkToFileWriter( Chunk chunk, ChunkMetadata chunkMetadata, boolean isCachedChunk) throws IOException { - CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk)); + compactionRateLimiter.acquire(1); if (chunkMetadata.getStartTime() < minStartTimestamp) { minStartTimestamp = chunkMetadata.getStartTime(); } @@ -316,8 +317,7 @@ public class SingleSeriesCompactionExecutor { private void flushChunkWriterIfLargeEnough() throws IOException { if (pointCountInChunkWriter >= targetChunkPointNum || chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) { - CompactionTaskManager.mergeRateLimiterAcquire( - compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize()); + compactionRateLimiter.acquire(1); CompactionMetricsRecorder.recordWriteInfo( CompactionType.INNER_SEQ_COMPACTION, ProcessChunkType.DESERIALIZE_CHUNK, @@ -338,8 +338,7 @@ public class SingleSeriesCompactionExecutor { } private void flushChunkWriter() throws IOException { - CompactionTaskManager.mergeRateLimiterAcquire( - compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize()); + compactionRateLimiter.acquire(1); CompactionMetricsRecorder.recordWriteInfo( CompactionType.INNER_SEQ_COMPACTION, ProcessChunkType.DESERIALIZE_CHUNK, diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java index 24a82136ce..9a54d45c64 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java @@ -49,7 +49,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { protected int subTaskNum = IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(); private RateLimiter compactionRateLimiter = - CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); + CompactionTaskManager.getInstance().getCompactionIORateLimiter(); // check if there is unseq error point during writing protected long[] lastTime = new long[subTaskNum]; @@ -162,7 +162,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { protected void sealChunk(TsFileIOWriter targetWriter, IChunkWriter iChunkWriter, int subTaskId) throws IOException { - CompactionTaskManager.mergeRateLimiterAcquire( + CompactionTaskManager.compactionIORateLimiterAcquire( compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize()); synchronized (targetWriter) { iChunkWriter.writeToFileWriter(targetWriter); @@ -184,7 +184,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { protected void flushNonAlignedChunkToFileWriter( TsFileIOWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId) throws IOException { - CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk)); + CompactionTaskManager.compactionIORateLimiterAcquire( + compactionRateLimiter, getChunkSize(chunk)); synchronized (targetWriter) { // seal last chunk to file writer chunkWriters[subTaskId].writeToFileWriter(targetWriter); @@ -208,7 +209,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { chunkPointNumArray[subTaskId] = 0; // flush time chunk - CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(timeChunk)); + CompactionTaskManager.compactionIORateLimiterAcquire( + compactionRateLimiter, getChunkSize(timeChunk)); targetWriter.writeChunk(timeChunk, (ChunkMetadata) timeChunkMetadata); // flush value chunks @@ -225,7 +227,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { valueChunkWriter.getStatistics()); continue; } - CompactionTaskManager.mergeRateLimiterAcquire( + CompactionTaskManager.compactionIORateLimiterAcquire( compactionRateLimiter, getChunkSize(valueChunk)); targetWriter.writeChunk(valueChunk, (ChunkMetadata) valueChunkMetadatas.get(i)); } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java index 6df41eac2a..4b854df976 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java @@ -75,7 +75,7 @@ public class CompactionConfigRestorer { config.setCompactionThreadCount(concurrentCompactionThread); config.setCompactionScheduleIntervalInMs(compactionScheduleIntervalInMs); config.setCompactionSubmissionIntervalInMs(compactionSubmissionIntervalInMs); - config.setCompactionWriteThroughputMbPerSec(compactionWriteThroughputMbPerSec); + config.setCompactionIORatePerSec(compactionWriteThroughputMbPerSec); config.setCrossCompactionPerformer(oldCrossPerformer); config.setInnerSeqCompactionPerformer(oldInnerSeqPerformer); config.setInnerUnseqCompactionPerformer(oldInnerUnseqPerformer);
