[GitHub] [kafka] mimaison commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-02-20 Thread via GitHub


mimaison commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1112025397


##
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to 
travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and 
the consumer

Review Comment:
   Can we update the comment to include the topic argument that's required too?
   
   Also let's format this javadoc comment a bit so it renders nicely



##
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to 
travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and 
the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 1 1 20]
+ */
+public class EndToEndLatency {
+private final static long POLL_TIMEOUT_MS = 6;
+private final static short DEFAULT_REPLICATION_FACTOR = 1;
+private final static int DEFAULT_NUM_PARTITIONS = 1;
+
+public 

[GitHub] [kafka] mimaison commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-26 Thread via GitHub


mimaison commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1088121137


##
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to 
travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and 
the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 1 1 20]
+ */
+public class EndToEndLatency {
+
+private final static long POLL_TIMEOUT_MS = 6;
+
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+return 1;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+// Visible for testing
+static void execute(String... args) throws Exception {
+if (args.length != 5 && args.length != 6) {
+throw new TerseException("USAGE: java " + 
EndToEndLatency.class.getName()
++ " broker_list topic num_messages producer_acks 
message_size_bytes [optional] properties_file");
+}
+
+String brokers = args[0];
+String topic = args[1];
+int numMessages = Integer.parseInt(args[2]);
+String acks = args[3];
+int messageSizeBytes = Integer.parseInt(args[4]);
+Optional propertiesFile = args.length > 5 ? 
(Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : 
Optional.empty();
+
+if (!Arrays.asList("1", "all").contains(acks)) {
+throw new IllegalArgumentException("Latency testing requires 
synchronous acknowledgement. Please use 1 or all");
+}
+
+try (KafkaConsumer consumer = 
createKafkaConsumer(propertiesFile, brokers);
+ KafkaProducer producer = 
createKafkaProducer(propertiesFile, brokers, acks)) {
+
+if (!consumer.listTopics().containsKey(topic)) {
+createTopic(propertiesFile, brokers, topic);
+}
+setupConsumer(topic, consumer);
+double totalTime = 0.0;
+long[] latencies = new long[numMessages];
+Random random = new Random(0);
+
+for (int i = 0; i < numMessages; i++) {
+byte[] message = randomBytesOfLen(random, messageSizeBytes);
+long begin = System.nanoTime();
+//Send message (of random bytes) synchronously then 
immediately poll for it
+producer.send(new ProducerRecord<>(topic, message)).get();
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+long elapsed = System.nanoTime() - begin;
+
+