This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 93068b4a1bb MINOR: Fix the compression arguments in
TestLinearWriteSpeed (#20349)
93068b4a1bb is described below
commit 93068b4a1bb345d1f151692c2727e0830ab08938
Author: Maros Orsak <[email protected]>
AuthorDate: Sun Aug 17 13:21:51 2025 +0200
MINOR: Fix the compression arguments in TestLinearWriteSpeed (#20349)
This PR fixes a problem related to `TestLinearWriteSpeed`. During my
work on KIP-780, I discovered that benchmarks for `TestLinearWriteSpeed`
do not account for compression algorithms. It always uses
`Compression.NONE` when creating records. The problem was introduced in
this PR [1].
[1] - https://github.com/apache/kafka/pull/17736
Reviewers: Ken Huang <[email protected]>, Mickael Maison
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/jmh/log/TestLinearWriteSpeed.java | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java
index 668ddfcfff8..32c8d87062e 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java
@@ -81,13 +81,13 @@ public class TestLinearWriteSpeed {
.describedAs("num_bytes")
.ofType(Integer.class);
- OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size",
"REQUIRED: The size of each message in the message set.")
+ OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size",
"The size of each message in the message set.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Integer.class)
.defaultsTo(1024);
- OptionSpec<Integer> filesOpt = parser.accepts("files", "REQUIRED: The
number of logs or files.")
+ OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of
logs or files.")
.withRequiredArg()
.describedAs("num_files")
.ofType(Integer.class)
@@ -120,14 +120,13 @@ public class TestLinearWriteSpeed {
OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The
compression level to use")
.withRequiredArg()
.describedAs("level")
- .ofType(Integer.class)
- .defaultsTo(0);
+ .ofType(Integer.class);
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to
memory-mapped files.");
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to
file channels.");
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka
logs.");
OptionSet options = parser.parse(args);
- CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt,
filesOpt);
+ CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt);
long bytesToWrite = options.valueOf(bytesOpt);
int bufferSize = options.valueOf(sizeOpt);
@@ -140,9 +139,10 @@ public class TestLinearWriteSpeed {
long flushInterval = options.valueOf(flushIntervalOpt);
CompressionType compressionType =
CompressionType.forName(options.valueOf(compressionCodecOpt));
Compression.Builder<? extends Compression> compressionBuilder =
Compression.of(compressionType);
- int compressionLevel = options.valueOf(compressionLevelOpt);
+ Integer compressionLevel = options.valueOf(compressionLevelOpt);
- setupCompression(compressionType, compressionBuilder,
compressionLevel);
+ if (compressionLevel != null) setupCompression(compressionType,
compressionBuilder, compressionLevel);
+ Compression compression = compressionBuilder.build();
ThreadLocalRandom.current().nextBytes(buffer.array());
int numMessages = bufferSize / (messageSize + Records.LOG_OVERHEAD);
@@ -153,7 +153,7 @@ public class TestLinearWriteSpeed {
recordsList.add(new SimpleRecord(createTime, null, new
byte[messageSize]));
}
- MemoryRecords messageSet = MemoryRecords.withRecords(Compression.NONE,
recordsList.toArray(new SimpleRecord[0]));
+ MemoryRecords messageSet = MemoryRecords.withRecords(compression,
recordsList.toArray(new SimpleRecord[0]));
Writable[] writables = new Writable[numFiles];
KafkaScheduler scheduler = new KafkaScheduler(1);
scheduler.startup();
@@ -222,7 +222,7 @@ public class TestLinearWriteSpeed {
private static void setupCompression(CompressionType compressionType,
Compression.Builder<? extends
Compression> compressionBuilder,
- int compressionLevel) {
+ Integer compressionLevel) {
switch (compressionType) {
case GZIP:
((GzipCompression.Builder)
compressionBuilder).level(compressionLevel);