Repository: kafka Updated Branches: refs/heads/trunk d50499a0e -> 1cc44830b
KAFKA-2562: update kafka scripts to use new tools/code Updated kafka-producer-perf-test.sh to use org.apache.kafka.clients.tools.ProducerPerformance. Updated build.gradle to add kafka-tools-0.9.0.0-SNAPSHOT.jar to kafka/libs folder. Author: Manikumar reddy O <[email protected]> Reviewers: Gwen Shapira, Ismael Juma Closes #242 from omkreddy/KAFKA-2562 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cc44830 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cc44830 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cc44830 Branch: refs/heads/trunk Commit: 1cc44830b90688f1a2034243b8ee03b97101f75c Parents: d50499a Author: Manikumar reddy O <[email protected]> Authored: Fri Oct 30 15:30:34 2015 -0700 Committer: Gwen Shapira <[email protected]> Committed: Fri Oct 30 15:30:34 2015 -0700 ---------------------------------------------------------------------- bin/kafka-producer-perf-test.sh | 2 +- bin/windows/kafka-producer-perf-test.bat | 2 +- build.gradle | 5 +- .../performance/producer_performance.py | 2 +- .../apache/kafka/tools/ProducerPerformance.java | 152 ++++++++++++++----- 5 files changed, 117 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1cc44830/bin/kafka-producer-perf-test.sh ---------------------------------------------------------------------- diff --git a/bin/kafka-producer-perf-test.sh b/bin/kafka-producer-perf-test.sh index 84ac949..f583662 100755 --- a/bin/kafka-producer-perf-test.sh +++ b/bin/kafka-producer-perf-test.sh @@ -17,4 +17,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.tools.ProducerPerformance $@ +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance $@ http://git-wip-us.apache.org/repos/asf/kafka/blob/1cc44830/bin/windows/kafka-producer-perf-test.bat ---------------------------------------------------------------------- diff --git a/bin/windows/kafka-producer-perf-test.bat b/bin/windows/kafka-producer-perf-test.bat index a894752..55b024b 100644 --- a/bin/windows/kafka-producer-perf-test.bat +++ b/bin/windows/kafka-producer-perf-test.bat @@ -16,5 +16,5 @@ rem limitations under the License. SetLocal set KAFKA_HEAP_OPTS=-Xmx512M -%~dp0kafka-run-class.bat kafka.tools.ProducerPerformance %* +%~dp0kafka-run-class.bat org.apache.kafka.tools.ProducerPerformance %* EndLocal http://git-wip-us.apache.org/repos/asf/kafka/blob/1cc44830/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 279c51f..1987516 100644 --- a/build.gradle +++ b/build.gradle @@ -254,7 +254,7 @@ project(':core') { dependencies { compile project(':clients') - compile project(':log4j-appender') + compile "$slf4jlog4j" compile "org.scala-lang:scala-library:$scalaVersion" compile 'org.apache.zookeeper:zookeeper:3.4.6' compile 'com.101tec:zkclient:0.6' @@ -318,6 +318,9 @@ project(':core') { from(configurations.runtime) { into("libs/") } from(configurations.archives.artifacts.files) { into("libs/") } from(project.siteDocsTar) { into("site-docs/") } + from(project(':log4j-appender').jar) { into("libs/") } + from(project(':tools').jar) { into("libs/") } + from(project(':tools').configurations.runtime) { into("libs/") } } jar { http://git-wip-us.apache.org/repos/asf/kafka/blob/1cc44830/tests/kafkatest/services/performance/producer_performance.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index 401d6f7..a3b1d0e 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -53,7 +53,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): 'kafka_directory': kafka_dir(node) }) cmd = "JMX_PORT=%(jmx_port)d /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \ - "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args + "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args self.security_config.setup_node(node) if self.security_protocol == SecurityConfig.SSL: http://git-wip-us.apache.org/repos/asf/kafka/blob/1cc44830/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 3a90a10..3a06862 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -3,67 +3,135 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ package org.apache.kafka.tools; +import static net.sourceforge.argparse4j.impl.Arguments.store; + import java.util.Arrays; +import java.util.List; import java.util.Properties; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; + import org.apache.kafka.clients.producer.*; public class ProducerPerformance { public static void main(String[] args) throws Exception { - if (args.length < 4) { - System.err.println("USAGE: java " + ProducerPerformance.class.getName() + - " topic_name num_records record_size target_records_sec [prop_name=prop_value]*"); - System.exit(1); - } + ArgumentParser parser = argParser(); - /* parse args */ - String topicName = args[0]; - long numRecords = Long.parseLong(args[1]); - int recordSize = Integer.parseInt(args[2]); - int throughput = Integer.parseInt(args[3]); - - Properties props = new Properties(); - for (int i = 4; i < args.length; i++) { - String[] pieces = args[i].split("="); - if (pieces.length != 2) - throw new IllegalArgumentException("Invalid property: " + args[i]); - props.put(pieces[0], pieces[1]); - } - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props); - - /* setup perf test */ - byte[] payload = new byte[recordSize]; - Arrays.fill(payload, (byte) 1); - ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload); - Stats stats = new Stats(numRecords, 5000); - long startMs = System.currentTimeMillis(); - - ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs); - for (int i = 0; i < numRecords; i++) { - long sendStartMs = System.currentTimeMillis(); - Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats); - producer.send(record, cb); - - if (throttler.shouldThrottle(i, sendStartMs)) { - throttler.throttle(); + try { + Namespace res = parser.parseArgs(args); + + /* parse args */ + String topicName = res.getString("topic"); + long numRecords = res.getLong("numRecords"); + int recordSize = res.getInt("recordSize"); + int throughput = res.getInt("throughput"); + List<String> producerProps = res.getList("producerConfig"); + + Properties props = new Properties(); + if (producerProps != null) + for (String prop : producerProps) { + String[] pieces = prop.split("="); + if (pieces.length != 2) + throw new IllegalArgumentException("Invalid property: " + prop); + props.put(pieces[0], pieces[1]); + } + + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props); + + /* setup perf test */ + byte[] payload = new byte[recordSize]; + Arrays.fill(payload, (byte) 1); + ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload); + Stats stats = new Stats(numRecords, 5000); + long startMs = System.currentTimeMillis(); + + ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs); + for (int i = 0; i < numRecords; i++) { + long sendStartMs = System.currentTimeMillis(); + Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats); + producer.send(record, cb); + + if (throttler.shouldThrottle(i, sendStartMs)) { + throttler.throttle(); + } + } + + /* print final results */ + producer.close(); + stats.printTotal(); + } catch (ArgumentParserException e) { + if (args.length == 0) { + parser.printHelp(); + System.exit(0); + } else { + parser.handleError(e); + System.exit(1); } } - /* print final results */ - producer.close(); - stats.printTotal(); + } + + /** Get the command-line argument parser. */ + private static ArgumentParser argParser() { + ArgumentParser parser = ArgumentParsers + .newArgumentParser("producer-performance") + .defaultHelp(true) + .description("This tool is used to verify the producer performance."); + + parser.addArgument("--topic") + .action(store()) + .required(true) + .type(String.class) + .metavar("TOPIC") + .help("produce messages to this topic"); + + parser.addArgument("--num-records") + .action(store()) + .required(true) + .type(Long.class) + .metavar("NUM-RECORDS") + .dest("numRecords") + .help("number of messages to produce"); + + parser.addArgument("--record-size") + .action(store()) + .required(true) + .type(Integer.class) + .metavar("RECORD-SIZE") + .dest("recordSize") + .help("message size in bytes"); + + parser.addArgument("--throughput") + .action(store()) + .required(true) + .type(Integer.class) + .metavar("THROUGHPUT") + .help("throttle maximum message throughput to *approximately* THROUGHPUT messages/sec"); + + parser.addArgument("--producer-props") + .nargs("+") + .required(true) + .metavar("PROP-NAME=PROP-VALUE") + .type(String.class) + .dest("producerConfig") + .help("kafka producer related configuaration properties like bootstrap.servers,client.id etc.."); + + return parser; } private static class Stats {
