jt2594838 commented on code in PR #12312: URL: https://github.com/apache/iotdb/pull/12312#discussion_r1565052539
########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileOutput.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.io; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; +import org.apache.iotdb.tsfile.write.writer.TsFileOutput; + +import com.google.common.util.concurrent.RateLimiter; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class CompactionTsFileOutput extends OutputStream implements TsFileOutput { + + private TsFileOutput output; + private RateLimiter rateLimiter = CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); + private final long maxSizePerWrite; + + public CompactionTsFileOutput(TsFileOutput output) { + this.output = output; + maxSizePerWrite = + (int) + (rateLimiter.getRate() + / IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()); Review Comment: Similarly, the flexibility can be increased by putting the parameter in the constructor. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileOutput.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.io; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; +import org.apache.iotdb.tsfile.write.writer.TsFileOutput; + +import com.google.common.util.concurrent.RateLimiter; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class CompactionTsFileOutput extends OutputStream implements TsFileOutput { + + private TsFileOutput output; + private RateLimiter rateLimiter = CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); Review Comment: I would suggest this be a parameter in the constructor to enable finer granularity control in the future, e.g., different databases (of different users) may have their own resource allocation and thus different rate limits. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java: ########## @@ -687,6 +687,12 @@ public class IoTDBConfig { /** The limit of compaction merge can reach per second */ private int compactionWriteThroughputMbPerSec = 16; + /** The limit of compaction read throughput can reach per second */ + private int compactionReadThroughputMbPerSec = 40; Review Comment: Mb or MB? They are quite different. If you are concerned with the naming rule (which discourages you from using 3 or more consecutive capitalized letters), you may use something like "compactionWriteThroughputMegaBytePerSec". ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileOutput.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.io; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; +import org.apache.iotdb.tsfile.write.writer.TsFileOutput; + +import com.google.common.util.concurrent.RateLimiter; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class CompactionTsFileOutput extends OutputStream implements TsFileOutput { + + private TsFileOutput output; + private RateLimiter rateLimiter = CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); + private final long maxSizePerWrite; + + public CompactionTsFileOutput(TsFileOutput output) { + this.output = output; + maxSizePerWrite = + (int) + (rateLimiter.getRate() + / IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()); Review Comment: And what is the point of the parameter? If the rate is 40MB/s, the thread count is 10, and the write size is 8MB, it is totally OK. The write will just take 2 seconds to wait for permission. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java: ########## @@ -243,19 +255,36 @@ private boolean isTaskRunning(AbstractCompactionTask task) { } public RateLimiter getMergeWriteRateLimiter() { - setWriteMergeRate( - IoTDBDescriptor.getInstance().getConfig().getCompactionWriteThroughputMbPerSec()); return mergeWriteRateLimiter; } - private void setWriteMergeRate(final double throughoutMbPerSec) { - double throughout = throughoutMbPerSec * 1024.0 * 1024.0; + public RateLimiter getCompactionReadRateLimiter() { + return compactionReadThroughputRateLimiter; + } + + public RateLimiter getCompactionReadOperationRateLimiter() { + return compactionReadOperationRateLimiter; + } + + public void setWriteMergeRate(final double throughoutMbPerSec) { + setRate(mergeWriteRateLimiter, throughoutMbPerSec * 1024.0 * 1024.0); + } + + public void setCompactionReadOperationRate(final double readOperationPerSec) { + setRate(compactionReadOperationRateLimiter, readOperationPerSec); + } + + public void setCompactionReadThroughputRate(final double throughputMbPerSec) { + setRate(compactionReadThroughputRateLimiter, throughputMbPerSec * 1024.0 * 1024.0); + } + + private void setRate(RateLimiter rateLimiter, double rate) { // if throughout = 0, disable rate limiting - if (throughout <= 0) { - throughout = Double.MAX_VALUE; + if (rate <= 0) { + rate = Double.MAX_VALUE; } - if (mergeWriteRateLimiter.getRate() != throughout) { - mergeWriteRateLimiter.setRate(throughout); + if (rateLimiter.getRate() != rate) { + rateLimiter.setRate(rate); Review Comment: Hard to say this comparison is meaningful since the equality of two floating points may be meaningless. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java: ########## @@ -1117,6 +1129,35 @@ private void loadCompactionHotModifiedProps(Properties properties) throws Interr if (restartCompactionTaskManager) { CompactionTaskManager.getInstance().restart(); } + // hot load compaction rate limit configurations + + // update merge_write_throughput_mb_per_sec + conf.setCompactionWriteThroughputMbPerSec( + Integer.parseInt( + properties.getProperty( + "merge_write_throughput_mb_per_sec", + Integer.toString(conf.getCompactionWriteThroughputMbPerSec())))); + + // update compaction_read_operation_per_sec + conf.setCompactionReadOperationPerSec( + Integer.parseInt( + properties.getProperty( + "compaction_read_operation_per_sec", + Integer.toString(conf.getCompactionReadOperationPerSec())))); Review Comment: Why is there no write IOPS control similar to read? ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileOutput.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.io; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; +import org.apache.iotdb.tsfile.write.writer.TsFileOutput; + +import com.google.common.util.concurrent.RateLimiter; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class CompactionTsFileOutput extends OutputStream implements TsFileOutput { + + private TsFileOutput output; + private RateLimiter rateLimiter = CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); + private final long maxSizePerWrite; + + public CompactionTsFileOutput(TsFileOutput output) { + this.output = output; + maxSizePerWrite = + (int) + (rateLimiter.getRate() + / IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()); + } + + @Override + public void write(int b) throws IOException { + output.wrapAsStream().write(b); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte b) throws IOException { + output.write(b); + } Review Comment: Insignificant as it may be, the two methods with a single byte should still be controlled by the rate limiter. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java: ########## @@ -248,6 +250,23 @@ public void getDeviceTimeseriesMetadata( .recordReadInfo(compactionType, CompactionIoDataType.METADATA, dataSize); } + private void acquireReadDataSizeWithCompactionReadRateLimiter(long readDataSize) { + CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1); + while (readDataSize > 0) { + if (readDataSize > Integer.MAX_VALUE) { + CompactionTaskManager.getInstance() + .getCompactionReadRateLimiter() + .acquire(Integer.MAX_VALUE); + readDataSize -= Integer.MAX_VALUE; + } else { + CompactionTaskManager.getInstance() + .getCompactionReadRateLimiter() + .acquire((int) readDataSize); + return; + } Review Comment: Why bother checking? In line 79, where you call this function, readDataSize must be an int. Also, the IO unit cannot be long since the size of a byte[], or a ByteBuffer can never go beyond an int. ########## iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties: ########## @@ -649,6 +649,16 @@ data_replication_factor=1 # Datatype: int # compaction_write_throughput_mb_per_sec=16 +# The limit of read throughput merge can reach per second +# values less than or equal to 0 means no limit +# Datatype: int +# compaction_read_throughput_mb_per_sec=40 Review Comment: Also, notice the difference between Mb and MB here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
