[GitHub] [kafka] mimaison commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
mimaison commented on code in PR #13095: URL: https://github.com/apache/kafka/pull/13095#discussion_r1112025397 ## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; +import java.util.List; +import java.util.Random; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * This class records the average end to end latency for a single message to travel through Kafka + * + * broker_list = location of the bootstrap broker for both the producer and the consumer Review Comment: Can we update the comment to include the topic argument that's required too? Also let's format this javadoc comment a bit so it renders nicely ## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; +import java.util.List; +import java.util.Random; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * This class records the average end to end latency for a single message to travel through Kafka + * + * broker_list = location of the bootstrap broker for both the producer and the consumer + * num_messages = # messages to send + * producer_acks = See ProducerConfig.ACKS_DOC + * message_size_bytes = size of each message in bytes + * + * e.g. [localhost:9092 test 1 1 20] + */ +public class EndToEndLatency { +private final static long POLL_TIMEOUT_MS = 6; +private final static short DEFAULT_REPLICATION_FACTOR = 1; +private final static int DEFAULT_NUM_PARTITIONS = 1; + +public
[GitHub] [kafka] mimaison commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
mimaison commented on code in PR #13095: URL: https://github.com/apache/kafka/pull/13095#discussion_r1088121137 ## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; +import java.util.List; +import java.util.Random; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * This class records the average end to end latency for a single message to travel through Kafka + * + * broker_list = location of the bootstrap broker for both the producer and the consumer + * num_messages = # messages to send + * producer_acks = See ProducerConfig.ACKS_DOC + * message_size_bytes = size of each message in bytes + * + * e.g. [localhost:9092 test 1 1 20] + */ +public class EndToEndLatency { + +private final static long POLL_TIMEOUT_MS = 6; + +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (TerseException e) { +System.err.println(e.getMessage()); +return 1; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +// Visible for testing +static void execute(String... args) throws Exception { +if (args.length != 5 && args.length != 6) { +throw new TerseException("USAGE: java " + EndToEndLatency.class.getName() ++ " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file"); +} + +String brokers = args[0]; +String topic = args[1]; +int numMessages = Integer.parseInt(args[2]); +String acks = args[3]; +int messageSizeBytes = Integer.parseInt(args[4]); +Optional propertiesFile = args.length > 5 ? (Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : Optional.empty(); + +if (!Arrays.asList("1", "all").contains(acks)) { +throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all"); +} + +try (KafkaConsumer consumer = createKafkaConsumer(propertiesFile, brokers); + KafkaProducer producer = createKafkaProducer(propertiesFile, brokers, acks)) { + +if (!consumer.listTopics().containsKey(topic)) { +createTopic(propertiesFile, brokers, topic); +} +setupConsumer(topic, consumer); +double totalTime = 0.0; +long[] latencies = new long[numMessages]; +Random random = new Random(0); + +for (int i = 0; i < numMessages; i++) { +byte[] message = randomBytesOfLen(random, messageSizeBytes); +long begin = System.nanoTime(); +//Send message (of random bytes) synchronously then immediately poll for it +producer.send(new ProducerRecord<>(topic, message)).get(); +ConsumerRecords records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); +long elapsed = System.nanoTime() - begin; + +