This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ef10a52a52d KAFKA-19011 Improve EndToEndLatency Tool with argument 
parser and message key/header support (#20301)
ef10a52a52d is described below

commit ef10a52a52d99ca808f0f12f2c6fcd2d7a83adf5
Author: Nick Guo <[email protected]>
AuthorDate: Thu Sep 4 02:29:53 2025 +0800

    KAFKA-19011 Improve EndToEndLatency Tool with argument parser and message 
key/header support (#20301)
    
    jira: https://issues.apache.org/jira/browse/KAFKA-19011  kip:
    
    
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1172%3A+Improve+EndToEndLatency+tool
    
    This PR improves the usability and maintainability of the
    `kafka-e2e-latency.sh` tool:
    
    - Replaces fixed-index argument parsing with a proper argument parser
    (joptsimple)
    - Adds support for configuring:
        - -record-key-size: size of the message key
        - -num-headers: number of headers per message
        - -record-header-key-size: size of each header key
        - -record-header-size: size of each header value
    - Renames existing arguments to align with Kafka CLI conventions:
        - broker_list → bootstrap-server
        - num_messages → num-records
        - message_size_bytes → record-size
        - properties_file → command-config
        -
    
    Reviewers: Jhen-Yung Hsu <[email protected]>, Ken Huang
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../org/apache/kafka/tools/EndToEndLatency.java    | 322 ++++++++++++++++++---
 .../apache/kafka/tools/EndToEndLatencyTest.java    | 261 ++++++++++++++++-
 2 files changed, 531 insertions(+), 52 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java 
b/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
index c6914b4667b..486e4f57618 100644
--- a/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
+++ b/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
@@ -21,19 +21,25 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
@@ -42,16 +48,19 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+
 /**
  * This class records the average end to end latency for a single message to 
travel through Kafka.
  * Following are the required arguments
- * <p> broker_list = location of the bootstrap broker for both the producer 
and the consumer </p>
- * <p> topic = topic name used by both the producer and the consumer to 
send/receive messages </p>
- * <p> num_messages = # messages to send </p>
- * <p> producer_acks = See ProducerConfig.ACKS_DOC </p>
- * <p> message_size_bytes = size of each message in bytes </p>
+ * <p> --bootstrap-server = location of the bootstrap broker for both the 
producer and the consumer
+ * <p> --topic = topic name used by both the producer and the consumer to 
send/receive messages
+ * <p> --num-records = # messages to send
+ * <p> --producer-acks = See ProducerConfig.ACKS_DOC
+ * <p> --record-size = size of each message value in bytes
  *
- * <p> e.g. [localhost:9092 test 10000 1 20] </p>
+ * <p> e.g. [./bin/kafka-e2e-latency.sh --bootstrap-server localhost:9092 
--topic test-topic --num-records 1000 --producer-acks 1 --record-size 512]
  */
 public class EndToEndLatency {
     private static final long POLL_TIMEOUT_MS = 60000;
@@ -77,22 +86,23 @@ public class EndToEndLatency {
     }
 
     // Visible for testing
-    static void execute(String... args) throws Exception {
-        if (args.length != 5 && args.length != 6) {
-            throw new TerseException("USAGE: java " + 
EndToEndLatency.class.getName()
-                    + " broker_list topic num_messages producer_acks 
message_size_bytes [optional] properties_file");
-        }
+    static void execute(String[] args) throws Exception {
+        String[] processedArgs = convertLegacyArgsIfNeeded(args);
+        EndToEndLatencyCommandOptions opts = new 
EndToEndLatencyCommandOptions(processedArgs);
 
-        String brokers = args[0];
-        String topic = args[1];
-        int numMessages = Integer.parseInt(args[2]);
-        String acks = args[3];
-        int messageSizeBytes = Integer.parseInt(args[4]);
-        Optional<String> propertiesFile = (args.length > 5 && 
!Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty();
+        // required
+        String brokers = opts.options.valueOf(opts.bootstrapServerOpt);
+        String topic = opts.options.valueOf(opts.topicOpt);
+        int numRecords = opts.options.valueOf(opts.numRecordsOpt);
+        String acks = opts.options.valueOf(opts.acksOpt);
+        int recordValueSize = opts.options.valueOf(opts.recordSizeOpt);
 
-        if (!List.of("1", "all").contains(acks)) {
-            throw new IllegalArgumentException("Latency testing requires 
synchronous acknowledgement. Please use 1 or all");
-        }
+        // optional
+        Optional<String> propertiesFile = 
Optional.ofNullable(opts.options.valueOf(opts.commandConfigOpt));
+        int recordKeySize = opts.options.valueOf(opts.recordKeySizeOpt);
+        int numHeaders = opts.options.valueOf(opts.numHeadersOpt);
+        int headerKeySize = opts.options.valueOf(opts.recordHeaderKeySizeOpt);
+        int headerValueSize = 
opts.options.valueOf(opts.recordHeaderValueSizeOpt);
 
         try (KafkaConsumer<byte[], byte[]> consumer = 
createKafkaConsumer(propertiesFile, brokers);
              KafkaProducer<byte[], byte[]> producer = 
createKafkaProducer(propertiesFile, brokers, acks)) {
@@ -102,18 +112,21 @@ public class EndToEndLatency {
             }
             setupConsumer(topic, consumer);
             double totalTime = 0.0;
-            long[] latencies = new long[numMessages];
+            long[] latencies = new long[numRecords];
             Random random = new Random(0);
 
-            for (int i = 0; i < numMessages; i++) {
-                byte[] message = randomBytesOfLen(random, messageSizeBytes);
+            for (int i = 0; i < numRecords; i++) {
+                byte[] recordKey = randomBytesOfLen(random, recordKeySize);
+                byte[] recordValue = randomBytesOfLen(random, recordValueSize);
+                List<Header> headers = 
generateHeadersWithSeparateSizes(random, numHeaders, headerKeySize, 
headerValueSize);
+
                 long begin = System.nanoTime();
                 //Send message (of random bytes) synchronously then 
immediately poll for it
-                producer.send(new ProducerRecord<>(topic, message)).get();
+                producer.send(new ProducerRecord<>(topic, null, recordKey, 
recordValue, headers)).get();
                 ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
                 long elapsed = System.nanoTime() - begin;
 
-                validate(consumer, message, records);
+                validate(consumer, recordValue, records, recordKey, headers);
 
                 //Report progress
                 if (i % 1000 == 0)
@@ -122,33 +135,97 @@ public class EndToEndLatency {
                 latencies[i] = elapsed / 1000 / 1000;
             }
 
-            printResults(numMessages, totalTime, latencies);
+            printResults(numRecords, totalTime, latencies);
             consumer.commitSync();
         }
     }
 
     // Visible for testing
-    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] 
message, ConsumerRecords<byte[], byte[]> records) {
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] 
sentRecordValue, ConsumerRecords<byte[], byte[]> records, byte[] sentRecordKey, 
Iterable<Header> sentHeaders) {
         if (records.isEmpty()) {
-            consumer.commitSync();
-            throw new RuntimeException("poll() timed out before finding a 
result (timeout:[" + POLL_TIMEOUT_MS + "])");
+            commitAndThrow(consumer, "poll() timed out before finding a result 
(timeout:[" + POLL_TIMEOUT_MS + "ms])");
         }
 
-        //Check result matches the original record
-        String sent = new String(message, StandardCharsets.UTF_8);
-        String read = new String(records.iterator().next().value(), 
StandardCharsets.UTF_8);
+        ConsumerRecord<byte[], byte[]> record = records.iterator().next();
+        String sent = new String(sentRecordValue, StandardCharsets.UTF_8);
+        String read = new String(record.value(), StandardCharsets.UTF_8);
 
         if (!read.equals(sent)) {
-            consumer.commitSync();
-            throw new RuntimeException("The message read [" + read + "] did 
not match the message sent [" + sent + "]");
+            commitAndThrow(consumer, "The message value read [" + read + "] 
did not match the message value sent [" + sent + "]");
+        }
+
+        if (sentRecordKey != null) {
+            if (record.key() == null) {
+                commitAndThrow(consumer, "Expected message key but received 
null");
+            }
+            String sentKey = new String(sentRecordKey, StandardCharsets.UTF_8);
+            String readKey = new String(record.key(), StandardCharsets.UTF_8);
+            if (!readKey.equals(sentKey)) {
+                commitAndThrow(consumer, "The message key read [" + readKey + 
"] did not match the message key sent [" + sentKey + "]");
+            }
+        } else if (record.key() != null) {
+            commitAndThrow(consumer, "Expected null message key but received 
[" + new String(record.key(), StandardCharsets.UTF_8) + "]");
         }
 
+        validateHeaders(consumer, sentHeaders, record);
+
         //Check we only got the one message
         if (records.count() != 1) {
             int count = records.count();
-            consumer.commitSync();
-            throw new RuntimeException("Only one result was expected during 
this test. We found [" + count + "]");
+            commitAndThrow(consumer, "Only one result was expected during this 
test. We found [" + count + "]");
+        }
+    }
+
+    private static void commitAndThrow(KafkaConsumer<byte[], byte[]> consumer, 
String message) {
+        consumer.commitSync();
+        throw new RuntimeException(message);
+    }
+
+    private static void validateHeaders(KafkaConsumer<byte[], byte[]> 
consumer, Iterable<Header> sentHeaders, ConsumerRecord<byte[], byte[]> record) {
+        if (sentHeaders != null && sentHeaders.iterator().hasNext()) {
+            if (!record.headers().iterator().hasNext()) {
+                commitAndThrow(consumer, "Expected message headers but 
received none");
+            }
+            
+            Iterator<Header> sentIterator = sentHeaders.iterator();
+            Iterator<Header> receivedIterator = record.headers().iterator();
+            
+            while (sentIterator.hasNext() && receivedIterator.hasNext()) {
+                Header sentHeader = sentIterator.next();
+                Header receivedHeader = receivedIterator.next();
+                if (!receivedHeader.key().equals(sentHeader.key()) || 
!Arrays.equals(receivedHeader.value(), sentHeader.value())) {
+                    String receivedValueStr = receivedHeader.value() == null ? 
"null" : Arrays.toString(receivedHeader.value());
+                    String sentValueStr = sentHeader.value() == null ? "null" 
: Arrays.toString(sentHeader.value());
+                    commitAndThrow(consumer, "The message header read [" + 
receivedHeader.key() + ":" + receivedValueStr +
+                            "] did not match the message header sent [" + 
sentHeader.key() + ":" + sentValueStr + "]");
+                }
+            }
+            
+            if (sentIterator.hasNext() || receivedIterator.hasNext()) {
+                commitAndThrow(consumer, "Header count mismatch between sent 
and received messages");
+            }
+        }
+    }
+
+    private static List<Header> generateHeadersWithSeparateSizes(Random 
random, int numHeaders, int keySize, int valueSize) {
+        List<Header> headers = new ArrayList<>();
+
+        for (int i = 0; i < numHeaders; i++) {
+            String headerKey = new String(randomBytesOfLen(random, keySize), 
StandardCharsets.UTF_8);
+            byte[] headerValue = valueSize == -1 ? null : 
randomBytesOfLen(random, valueSize);
+            headers.add(new Header() {
+                @Override
+                public String key() {
+                    return headerKey;
+                }
+
+                @Override
+                public byte[] value() {
+                    return headerValue;
+                }
+            });
         }
+        return headers;
     }
 
     private static void setupConsumer(String topic, KafkaConsumer<byte[], 
byte[]> consumer) {
@@ -162,8 +239,8 @@ public class EndToEndLatency {
         consumer.assignment().forEach(consumer::position);
     }
 
-    private static void printResults(int numMessages, double totalTime, long[] 
latencies) {
-        System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 
1000.0 / 1000.0);
+    private static void printResults(int numRecords, double totalTime, long[] 
latencies) {
+        System.out.printf("Avg latency: %.4f ms%n", totalTime / numRecords / 
1000.0 / 1000.0);
         Arrays.sort(latencies);
         int p50 = (int) latencies[(int) (latencies.length * 0.5)];
         int p99 = (int) latencies[(int) (latencies.length * 0.99)];
@@ -221,4 +298,173 @@ public class EndToEndLatency {
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
         return new KafkaProducer<>(producerProps);
     }
+
+    /**
+     * Converts legacy positional arguments to named arguments for backward 
compatibility.
+     *
+     * @param args the command line arguments to convert
+     * @return converted named arguments
+     * @throws Exception if the legacy arguments are invalid
+     * @deprecated Positional argument usage is deprecated and will be removed 
in Apache Kafka 5.0.
+     *             Use named arguments instead: --bootstrap-server, --topic, 
--num-records, --producer-acks, --record-size, --command-config
+     */
+    @Deprecated(since = "4.2", forRemoval = true)
+    static String[] convertLegacyArgsIfNeeded(String[] args) throws Exception {
+        if (args.length == 0) {
+            return args;
+        }
+
+        boolean hasRequiredNamedArgs = Arrays.stream(args).anyMatch(arg -> 
+            arg.equals("--bootstrap-server") || 
+            arg.equals("--topic") || 
+            arg.equals("--num-records") || 
+            arg.equals("--producer-acks") || 
+            arg.equals("--record-size"));
+        if (hasRequiredNamedArgs) {
+            return args;
+        }
+
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("Invalid number of arguments. Expected 5 
or 6 positional arguments, but got " + args.length + ". " +
+                    "Usage: bootstrap-server topic num-records producer-acks 
record-size [optional] command-config");
+        }
+
+        return convertLegacyArgs(args);
+    }
+
+    private static String[] convertLegacyArgs(String[] legacyArgs) {
+        List<String> newArgs = new ArrayList<>();
+
+        // broker_list -> --bootstrap-server
+        newArgs.add("--bootstrap-server");
+        newArgs.add(legacyArgs[0]);
+
+        // topic -> --topic
+        newArgs.add("--topic");
+        newArgs.add(legacyArgs[1]);
+
+        // num_messages -> --num-records
+        newArgs.add("--num-records");
+        newArgs.add(legacyArgs[2]);
+
+        // producer_acks -> --producer-acks
+        newArgs.add("--producer-acks");
+        newArgs.add(legacyArgs[3]);
+
+        // message_size_bytes -> --record-size
+        newArgs.add("--record-size");
+        newArgs.add(legacyArgs[4]);
+
+        // properties_file -> --command-config
+        if (legacyArgs.length == 6) {
+            newArgs.add("--command-config");
+            newArgs.add(legacyArgs[5]);
+        }
+        System.out.println("WARNING: Positional argument usage is deprecated 
and will be removed in Apache Kafka 5.0. " +
+                "Please use named arguments instead: --bootstrap-server, 
--topic, --num-records, --producer-acks, --record-size, --command-config");
+        return newArgs.toArray(new String[0]);
+    }
+
+    public static final class EndToEndLatencyCommandOptions extends 
CommandDefaultOptions {
+        final OptionSpec<String> bootstrapServerOpt;
+        final OptionSpec<String> topicOpt;
+        final OptionSpec<Integer> numRecordsOpt;
+        final OptionSpec<String> acksOpt;
+        final OptionSpec<Integer> recordSizeOpt;
+        final OptionSpec<String> commandConfigOpt;
+        final OptionSpec<Integer> recordKeySizeOpt;
+        final OptionSpec<Integer> recordHeaderValueSizeOpt;
+        final OptionSpec<Integer> recordHeaderKeySizeOpt;
+        final OptionSpec<Integer> numHeadersOpt;
+
+        public EndToEndLatencyCommandOptions(String[] args) {
+            super(args);
+
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: 
The Kafka broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("bootstrap-server")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic to use for 
the test.")
+                    .withRequiredArg()
+                    .describedAs("topic-name")
+                    .ofType(String.class);
+            numRecordsOpt = parser.accepts("num-records", "REQUIRED: The 
number of messages to send.")
+                    .withRequiredArg()
+                    .describedAs("count")
+                    .ofType(Integer.class);
+            acksOpt = parser.accepts("producer-acks", "REQUIRED: Producer 
acknowledgements. Must be '1' or 'all'.")
+                    .withRequiredArg()
+                    .describedAs("producer-acks")
+                    .ofType(String.class);
+            recordSizeOpt = parser.accepts("record-size", "REQUIRED: The size 
of each message payload in bytes.")
+                    .withRequiredArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class);
+            recordKeySizeOpt = parser.accepts("record-key-size", "Optional: 
The size of the message key in bytes. If not set, messages are sent without a 
key.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            recordHeaderKeySizeOpt = parser.accepts("record-header-key-size", 
"Optional: The size of the message header key in bytes. Used together with 
record-header-size.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            recordHeaderValueSizeOpt = parser.accepts("record-header-size", 
"Optional: The size of message header value in bytes. Use -1 for null header 
value.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            numHeadersOpt = parser.accepts("num-headers", "Optional: The 
number of headers to include in each message.")
+                    .withOptionalArg()
+                    .describedAs("count")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            commandConfigOpt = parser.accepts("command-config", "Optional: A 
property file for Kafka producer/consumer/admin client configuration.")
+                    .withOptionalArg()
+                    .describedAs("config-file")
+                    .ofType(String.class);
+
+            try {
+                options = parser.parse(args);
+            } catch (OptionException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+            checkArgs();
+        }
+
+        void checkArgs() {
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool measures 
end-to-end latency in Kafka by sending messages and timing their reception.");
+
+            // check required arguments
+            CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt, topicOpt, numRecordsOpt, acksOpt, recordSizeOpt);
+
+            // validate 'producer-acks'
+            String acksValue = options.valueOf(acksOpt);
+            if (!List.of("1", "all").contains(acksValue)) {
+                CommandLineUtils.printUsageAndExit(parser, "Invalid value for 
--producer-acks. Latency testing requires synchronous acknowledgement. Please 
use '1' or 'all'.");
+            }
+
+            // validate for num-records and record-size
+            if (options.valueOf(numRecordsOpt) <= 0) {
+                CommandLineUtils.printUsageAndExit(parser, "Value for 
--num-records must be a positive integer.");
+            }
+            if (options.valueOf(recordSizeOpt) < 0) {
+                CommandLineUtils.printUsageAndExit(parser, "Value for 
--record-size must be a non-negative integer.");
+            }
+
+            if (options.valueOf(recordKeySizeOpt) < 0) {
+                CommandLineUtils.printUsageAndExit(parser, "Value for 
--record-key-size must be a non-negative integer.");
+            }
+            if (options.valueOf(recordHeaderKeySizeOpt) < 0) {
+                CommandLineUtils.printUsageAndExit(parser, "Value for 
--record-header-key-size must be a non-negative integer.");
+            }
+            if (options.valueOf(recordHeaderValueSizeOpt) < -1) {
+                CommandLineUtils.printUsageAndExit(parser, "Value for 
--record-header-size must be a non-negative integer or -1 for null header 
value.");
+            }
+            if (options.valueOf(numHeadersOpt) < 0) {
+                CommandLineUtils.printUsageAndExit(parser, "Value for 
--num-headers must be a non-negative integer.");
+            }
+        }
+    }
 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java 
b/tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java
index 47ed7dd6666..e6f662b1d99 100644
--- a/tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java
@@ -19,6 +19,10 @@ package org.apache.kafka.tools;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.utils.Exit;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -29,9 +33,18 @@ import org.mockito.quality.Strictness;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -39,6 +52,55 @@ import static org.mockito.Mockito.when;
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
 public class EndToEndLatencyTest {
 
+    private static final byte[] RECORD_VALUE = 
"record-sent".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] RECORD_VALUE_DIFFERENT = 
"record-received".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] RECORD_KEY = 
"key-sent".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] RECORD_KEY_DIFFERENT = 
"key-received".getBytes(StandardCharsets.UTF_8);
+    private static final String HEADER_KEY = "header-key-sent";
+    private static final String HEADER_KEY_DIFFERENT = "header-key-received";
+    private static final byte[] HEADER_VALUE = 
"header-value-sent".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] HEADER_VALUE_DIFFERENT = 
"header-value-received".getBytes(StandardCharsets.UTF_8);
+
+    // legacy format test arguments
+    private static final String[] LEGACY_INVALID_ARGS_UNEXPECTED = {
+        "localhost:9092", "test", "10000", "1", "200", "propsfile.properties", 
"random"
+    };
+
+    private static class ArgsBuilder {
+        private final Map<String, String> params = new LinkedHashMap<>();
+        
+        private ArgsBuilder() {
+            params.put("--bootstrap-server", "localhost:9092");
+            params.put("--topic", "test-topic");
+            params.put("--num-records", "100");
+            params.put("--producer-acks", "1");
+            params.put("--record-size", "200");
+        }
+        
+        public static ArgsBuilder defaults() {
+            return new ArgsBuilder();
+        }
+        
+        public ArgsBuilder with(String param, String value) {
+            params.put(param, value);
+            return this;
+        }
+        
+        public String[] build() {
+            return params.entrySet().stream()
+                    .flatMap(entry -> Stream.of(entry.getKey(), 
entry.getValue()))
+                    .toArray(String[]::new);
+        }
+
+        public ArgsBuilder withNegative(String param) {
+            return with(param, "-1");
+        }
+        
+        public ArgsBuilder withZero(String param) {
+            return with(param, "0");
+        }
+    }
+
     @Mock
     KafkaConsumer<byte[], byte[]> consumer;
 
@@ -46,33 +108,176 @@ public class EndToEndLatencyTest {
     ConsumerRecords<byte[], byte[]> records;
 
     @Test
-    public void shouldFailWhenSuppliedUnexpectedArgs() {
-        String[] args = new String[] {"localhost:9092", "test", "10000", "1", 
"200", "propsfile.properties", "random"};
-        assertThrows(TerseException.class, () -> 
EndToEndLatency.execute(args));
+    public void testInvalidArgs() {
+        testUnexpectedArgsWithLegacyFormat();
+        testInvalidProducerAcks();
+        testInvalidNumRecords();
+        testInvalidRecordSize();
+        testInvalidRecordKey();
+        testInvalidNumHeaders();
+        testInvalidRecordHeaderKey();
+        testInvalidRecordHeaderValue();
+    }
+
+    private void testUnexpectedArgsWithLegacyFormat() {
+        String expectedMsg = "Invalid number of arguments. Expected 5 or 6 
positional arguments, but got 7.";
+        TerseException terseException = assertThrows(TerseException.class, () 
-> EndToEndLatency.execute(LEGACY_INVALID_ARGS_UNEXPECTED));
+        assertTrue(terseException.getMessage().contains(expectedMsg));
+    }
+
+    private void testInvalidNumRecords() {
+        String expectedMsg = "Value for --num-records must be a positive 
integer.";
+        assertInitializeInvalidOptionsExitCodeAndMsg(
+            ArgsBuilder.defaults().withNegative("--num-records").build(), 
expectedMsg);
+    }
+
+    private void testInvalidRecordSize() {
+        String expectedMsg = "Value for --record-size must be a non-negative 
integer.";
+        assertInitializeInvalidOptionsExitCodeAndMsg(
+            ArgsBuilder.defaults().withNegative("--record-size").build(), 
expectedMsg);
+    }
+
+    private void testInvalidRecordKey() {
+        String expectedMsg = "Value for --record-key-size must be a 
non-negative integer.";
+        assertInitializeInvalidOptionsExitCodeAndMsg(
+            ArgsBuilder.defaults().withNegative("--record-key-size").build(), 
expectedMsg);
+    }
+
+    private void testInvalidNumHeaders() {
+        String expectedMsg = "Value for --num-headers must be a non-negative 
integer.";
+        assertInitializeInvalidOptionsExitCodeAndMsg(
+                ArgsBuilder.defaults().withNegative("--num-headers").build(), 
expectedMsg);
+    }
+
+    private void testInvalidRecordHeaderKey() {
+        String expectedMsg = "Value for --record-header-key-size must be a 
non-negative integer.";
+        assertInitializeInvalidOptionsExitCodeAndMsg(
+            
ArgsBuilder.defaults().withNegative("--record-header-key-size").build(), 
expectedMsg);
+    }
+
+    private void testInvalidRecordHeaderValue() {
+        String expectedMsg = "Value for --record-header-size must be a 
non-negative integer.";
+        assertInitializeInvalidOptionsExitCodeAndMsg(
+            
ArgsBuilder.defaults().withNegative("--record-header-size").build(), 
expectedMsg);
+    }
+
+    private void testInvalidProducerAcks() {
+        String expectedMsg = "Invalid value for --producer-acks. Latency 
testing requires synchronous acknowledgement. Please use '1' or 'all'.";
+        assertInitializeInvalidOptionsExitCodeAndMsg(
+                ArgsBuilder.defaults().withZero("--producer-acks").build(), 
expectedMsg);
+    }
+
+    private void assertInitializeInvalidOptionsExitCodeAndMsg(String[] args, 
String expectedMsg) {
+        Exit.setExitProcedure((exitCode, message) -> {
+            assertEquals(1, exitCode);
+            assertTrue(message.contains(expectedMsg));
+            throw new RuntimeException();
+        });
+        try {
+            assertThrows(RuntimeException.class, () -> 
EndToEndLatency.execute(args));
+        } finally {
+            Exit.resetExitProcedure();
+        }
     }
 
     @Test
-    public void shouldFailWhenProducerAcksAreNotSynchronised() {
-        String[] args = new String[] {"localhost:9092", "test", "10000", "0", 
"200"};
-        assertThrows(IllegalArgumentException.class, () -> 
EndToEndLatency.execute(args));
+    @SuppressWarnings("removal")
+    public void testConvertLegacyArgs() throws Exception {
+        String[] legacyArgs = {"localhost:9092", "test", "100", "1", "200"};
+        String[] convertedArgs = 
EndToEndLatency.convertLegacyArgsIfNeeded(legacyArgs);
+        String[] expectedArgs = {
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--num-records", "100",
+            "--producer-acks", "1",
+            "--record-size", "200"
+        };
+        assertArrayEquals(expectedArgs, convertedArgs);
     }
 
     @Test
     public void shouldFailWhenConsumerRecordsIsEmpty() {
         when(records.isEmpty()).thenReturn(true);
-        assertThrows(RuntimeException.class, () -> 
EndToEndLatency.validate(consumer, new byte[0], records));
+        assertThrows(RuntimeException.class, () -> 
EndToEndLatency.validate(consumer, new byte[0], records, null, null));
     }
 
     @Test
     @SuppressWarnings("unchecked")
-    public void shouldFailWhenSentIsNotEqualToReceived() {
+    public void shouldFailWhenSentRecordIsNotEqualToReceived() {
         Iterator<ConsumerRecord<byte[], byte[]>> iterator = 
mock(Iterator.class);
         ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
         when(records.isEmpty()).thenReturn(false);
         when(records.iterator()).thenReturn(iterator);
         when(iterator.next()).thenReturn(record);
-        
when(record.value()).thenReturn("kafkab".getBytes(StandardCharsets.UTF_8));
-        assertThrows(RuntimeException.class, () -> 
EndToEndLatency.validate(consumer, "kafkaa".getBytes(StandardCharsets.UTF_8), 
records));
+        when(record.value()).thenReturn(RECORD_VALUE_DIFFERENT);
+        assertThrows(RuntimeException.class, () -> 
EndToEndLatency.validate(consumer, RECORD_VALUE, records, null, null));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldFailWhenSentRecordKeyIsNotEqualToReceived() {
+        Iterator<ConsumerRecord<byte[], byte[]>> iterator = 
mock(Iterator.class);
+        ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
+        when(records.isEmpty()).thenReturn(false);
+        when(records.iterator()).thenReturn(iterator);
+        when(iterator.next()).thenReturn(record);
+        when(record.value()).thenReturn(RECORD_VALUE);
+        when(record.key()).thenReturn(RECORD_KEY_DIFFERENT);
+
+        assertThrows(RuntimeException.class, () ->
+                EndToEndLatency.validate(consumer, RECORD_VALUE, records,
+                        RECORD_KEY, null));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldFailWhenSentHeaderKeyIsNotEqualToReceived() {
+        Iterator<ConsumerRecord<byte[], byte[]>> iterator = 
mock(Iterator.class);
+        ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
+        Headers headers = mock(Headers.class);
+        Iterator<Header> headerIterator = mock(Iterator.class);
+        Header receivedHeader = new RecordHeader(HEADER_KEY_DIFFERENT, 
HEADER_VALUE);
+
+        when(records.isEmpty()).thenReturn(false);
+        when(records.iterator()).thenReturn(iterator);
+        when(iterator.next()).thenReturn(record);
+        when(record.value()).thenReturn(RECORD_VALUE);
+        when(record.key()).thenReturn(null);
+        when(record.headers()).thenReturn(headers);
+        when(headers.iterator()).thenReturn(headerIterator);
+        when(headerIterator.hasNext()).thenReturn(true);
+        when(headerIterator.next()).thenReturn(receivedHeader);
+
+        Header sentHeader = new RecordHeader(HEADER_KEY, HEADER_VALUE);
+        List<Header> sentHeaders = List.of(sentHeader);
+
+        assertThrows(RuntimeException.class, () ->
+                EndToEndLatency.validate(consumer, RECORD_VALUE, records, 
null, sentHeaders));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldFailWhenSentHeaderValueIsNotEqualToReceived() {
+        Iterator<ConsumerRecord<byte[], byte[]>> iterator = 
mock(Iterator.class);
+        ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
+        Headers headers = mock(Headers.class);
+        Iterator<Header> headerIterator = mock(Iterator.class);
+        Header receivedHeader = new RecordHeader(HEADER_KEY, 
HEADER_VALUE_DIFFERENT);
+        Header sentHeader = new RecordHeader(HEADER_KEY, HEADER_VALUE);
+        List<Header> sentHeaders = List.of(sentHeader);
+
+        when(records.isEmpty()).thenReturn(false);
+        when(records.iterator()).thenReturn(iterator);
+        when(iterator.next()).thenReturn(record);
+        when(record.value()).thenReturn(RECORD_VALUE);
+        when(record.key()).thenReturn(null);
+        when(record.headers()).thenReturn(headers);
+        when(headers.iterator()).thenReturn(headerIterator);
+        when(headerIterator.hasNext()).thenReturn(true);
+        when(headerIterator.next()).thenReturn(receivedHeader);
+
+        assertThrows(RuntimeException.class, () ->
+                EndToEndLatency.validate(consumer, RECORD_VALUE, records, 
null, sentHeaders));
     }
 
     @Test
@@ -83,9 +288,9 @@ public class EndToEndLatencyTest {
         when(records.isEmpty()).thenReturn(false);
         when(records.iterator()).thenReturn(iterator);
         when(iterator.next()).thenReturn(record);
-        
when(record.value()).thenReturn("kafkaa".getBytes(StandardCharsets.UTF_8));
+        when(record.value()).thenReturn(RECORD_VALUE);
         when(records.count()).thenReturn(2);
-        assertThrows(RuntimeException.class, () -> 
EndToEndLatency.validate(consumer, "kafkaa".getBytes(StandardCharsets.UTF_8), 
records));
+        assertThrows(RuntimeException.class, () -> 
EndToEndLatency.validate(consumer, RECORD_VALUE, records, null, null));
     }
 
     @Test
@@ -93,12 +298,40 @@ public class EndToEndLatencyTest {
     public void shouldPassInValidation() {
         Iterator<ConsumerRecord<byte[], byte[]>> iterator = 
mock(Iterator.class);
         ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
+        Headers headers = mock(Headers.class);
+        Iterator<Header> headerIterator = mock(Iterator.class);
+        Header receivedHeader = new RecordHeader(HEADER_KEY, HEADER_VALUE);
+        Header sentHeader = new RecordHeader(HEADER_KEY, HEADER_VALUE);
+        List<Header> sentHeaders = List.of(sentHeader);
+
         when(records.isEmpty()).thenReturn(false);
         when(records.iterator()).thenReturn(iterator);
         when(iterator.next()).thenReturn(record);
-        
when(record.value()).thenReturn("kafkaa".getBytes(StandardCharsets.UTF_8));
+        when(record.value()).thenReturn(RECORD_VALUE);
+        byte[] recordKey = RECORD_KEY;
+        when(record.key()).thenReturn(recordKey);
         when(records.count()).thenReturn(1);
-        assertDoesNotThrow(() -> EndToEndLatency.validate(consumer, 
"kafkaa".getBytes(StandardCharsets.UTF_8), records));
+        when(record.headers()).thenReturn(headers);
+        when(headers.iterator()).thenReturn(headerIterator);
+        when(headerIterator.hasNext()).thenReturn(true, true, false);
+        when(headerIterator.next()).thenReturn(receivedHeader);
+
+        assertDoesNotThrow(() -> EndToEndLatency.validate(consumer, 
RECORD_VALUE, records, recordKey, sentHeaders));
+    }
+
+    @Test
+    public void shouldPassWithNamedArgs() {
+        AtomicReference<Integer> exitStatus = new AtomicReference<>();
+        Exit.setExitProcedure((status, __) -> {
+            exitStatus.set(status);
+            throw new RuntimeException();
+        });
+        try {
+            assertDoesNotThrow(() -> new 
EndToEndLatency.EndToEndLatencyCommandOptions(ArgsBuilder.defaults().build()));
+            assertNull(exitStatus.get());
+        } finally {
+            Exit.resetExitProcedure();
+        }
     }
 
 }


Reply via email to