This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d0ad84df5d5 MINOR: producer perf improvements (#17348)
d0ad84df5d5 is described below

commit d0ad84df5d52fdc55b590356cd0bba62c2de6a8e
Author: Federico Valeri <fedeval...@gmail.com>
AuthorDate: Thu Oct 3 04:29:19 2024 +0200

    MINOR: producer perf improvements (#17348)
    
    Adding some missing input checks and fixing a formatting issue.
    
    Signed-off-by: Federico Valeri <fedeval...@gmail.com>
    
    Reviewers: Luke Chen <show...@gmail.com>
---
 .../apache/kafka/tools/ProducerPerformance.java    | 12 +++++++++---
 .../kafka/tools/ProducerPerformanceTest.java       | 22 +++++++++++++++++++++-
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java 
b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index c9317d8a24c..7d7a2a2477f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -428,7 +428,7 @@ public class ProducerPerformance {
             double recsPerSec = 1000.0 * count / (double) elapsed;
             double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 
* 1024.0);
             int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 
0.999);
-            System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), 
%.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, 
%d ms 99.9th.%n",
+            System.out.printf("%d records sent, %.1f records/sec (%.2f 
MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d 
ms 99th, %d ms 99.9th.%n",
                               count,
                               recsPerSec,
                               mbPerSec,
@@ -479,7 +479,7 @@ public class ProducerPerformance {
 
     static final class ConfigPostProcessor {
         final String topicName;
-        final long numRecords;
+        final Long numRecords;
         final Integer recordSize;
         final double throughput;
         final boolean payloadMonotonic;
@@ -503,11 +503,17 @@ public class ProducerPerformance {
             String payloadFilePath = namespace.getString("payloadFile");
             Long transactionDurationMsArg = 
namespace.getLong("transactionDurationMs");
             String transactionIdArg = namespace.getString("transactionalId");
+            if (numRecords != null && numRecords <= 0) {
+                throw new ArgumentParserException("--num-records should be 
greater than zero", parser);
+            }
+            if (recordSize != null && recordSize <= 0) {
+                throw new ArgumentParserException("--record-size should be 
greater than zero", parser);
+            }
             if (producerConfigs == null && producerConfigFile == null) {
                 throw new ArgumentParserException("Either --producer-props or 
--producer.config must be specified.", parser);
             }
             if (transactionDurationMsArg != null && transactionDurationMsArg 
<= 0) {
-                throw new ArgumentParserException("--transaction-duration-ms 
should > 0", parser);
+                throw new ArgumentParserException("--transaction-duration-ms 
should be greater than zero", parser);
             }
 
             // since default value gets printed with the help text, we are 
escaping \n there and replacing it with correct value here.
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
index 1757fa86069..10b790bf914 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
@@ -377,9 +377,29 @@ public class ProducerPerformanceTest {
             "--record-size", "100",
             "--producer-props", "bootstrap.servers=localhost:9000",
             "--transaction-duration-ms", "0"};
-        assertEquals("--transaction-duration-ms should > 0",
+        assertEquals("--transaction-duration-ms should be greater than zero",
                 assertThrows(ArgumentParserException.class,
                         () -> new 
ProducerPerformance.ConfigPostProcessor(parser, 
invalidTransactionDurationMs)).getMessage());
+
+        String[] invalidNumRecords = new String[]{
+            "--topic", "Hello-Kafka",
+            "--num-records", "-5",
+            "--throughput", "100",
+            "--record-size", "100",
+            "--producer-props", "bootstrap.servers=localhost:9000"};
+        assertEquals("--num-records should be greater than zero",
+            assertThrows(ArgumentParserException.class,
+                () -> new ProducerPerformance.ConfigPostProcessor(parser, 
invalidNumRecords)).getMessage());
+
+        String[] invalidRecordSize = new String[]{
+            "--topic", "Hello-Kafka",
+            "--num-records", "5",
+            "--throughput", "100",
+            "--record-size", "-100",
+            "--producer-props", "bootstrap.servers=localhost:9000"};
+        assertEquals("--record-size should be greater than zero",
+            assertThrows(ArgumentParserException.class,
+                () -> new ProducerPerformance.ConfigPostProcessor(parser, 
invalidRecordSize)).getMessage());
     }
 
     @Test

Reply via email to