This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new d0ad84df5d5 MINOR: producer perf improvements (#17348) d0ad84df5d5 is described below commit d0ad84df5d52fdc55b590356cd0bba62c2de6a8e Author: Federico Valeri <fedeval...@gmail.com> AuthorDate: Thu Oct 3 04:29:19 2024 +0200 MINOR: producer perf improvements (#17348) Adding some missing input checks and fixing a formatting issue. Signed-off-by: Federico Valeri <fedeval...@gmail.com> Reviewers: Luke Chen <show...@gmail.com> --- .../apache/kafka/tools/ProducerPerformance.java | 12 +++++++++--- .../kafka/tools/ProducerPerformanceTest.java | 22 +++++++++++++++++++++- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index c9317d8a24c..7d7a2a2477f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -428,7 +428,7 @@ public class ProducerPerformance { double recsPerSec = 1000.0 * count / (double) elapsed; double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0); int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); - System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", + System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", count, recsPerSec, mbPerSec, @@ -479,7 +479,7 @@ public class ProducerPerformance { static final class ConfigPostProcessor { final String topicName; - final long numRecords; + final Long numRecords; final Integer recordSize; final double throughput; final boolean payloadMonotonic; @@ -503,11 +503,17 @@ public class ProducerPerformance { String payloadFilePath = namespace.getString("payloadFile"); Long transactionDurationMsArg = namespace.getLong("transactionDurationMs"); String transactionIdArg = namespace.getString("transactionalId"); + if (numRecords != null && numRecords <= 0) { + throw new ArgumentParserException("--num-records should be greater than zero", parser); + } + if (recordSize != null && recordSize <= 0) { + throw new ArgumentParserException("--record-size should be greater than zero", parser); + } if (producerConfigs == null && producerConfigFile == null) { throw new ArgumentParserException("Either --producer-props or --producer.config must be specified.", parser); } if (transactionDurationMsArg != null && transactionDurationMsArg <= 0) { - throw new ArgumentParserException("--transaction-duration-ms should > 0", parser); + throw new ArgumentParserException("--transaction-duration-ms should be greater than zero", parser); } // since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here. diff --git a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java index 1757fa86069..10b790bf914 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java @@ -377,9 +377,29 @@ public class ProducerPerformanceTest { "--record-size", "100", "--producer-props", "bootstrap.servers=localhost:9000", "--transaction-duration-ms", "0"}; - assertEquals("--transaction-duration-ms should > 0", + assertEquals("--transaction-duration-ms should be greater than zero", assertThrows(ArgumentParserException.class, () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidTransactionDurationMs)).getMessage()); + + String[] invalidNumRecords = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "-5", + "--throughput", "100", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + assertEquals("--num-records should be greater than zero", + assertThrows(ArgumentParserException.class, + () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidNumRecords)).getMessage()); + + String[] invalidRecordSize = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "-100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + assertEquals("--record-size should be greater than zero", + assertThrows(ArgumentParserException.class, + () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidRecordSize)).getMessage()); } @Test