KAFKA-2276; KIP-25 initial patch Initial patch for KIP-25
Note that to install ducktape, do *not* use pip to install ducktape. Instead: ``` $ git clone gitgithub.com:confluentinc/ducktape.git $ cd ducktape $ python setup.py install ``` Author: Geoff Anderson <[email protected]> Author: Geoff <[email protected]> Author: Liquan Pei <[email protected]> Reviewers: Ewen, Gwen, Jun, Guozhang Closes #70 from granders/KAFKA-2276 and squashes the following commits: a62fb6c [Geoff Anderson] fixed checkstyle errors a70f0f8 [Geoff Anderson] Merged in upstream trunk. 8b62019 [Geoff Anderson] Merged in upstream trunk. 47b7b64 [Geoff Anderson] Created separate tools jar so that the clients package does not pull in dependencies on the Jackson JSON tools or argparse4j. a9e6a14 [Geoff Anderson] Merged in upstream changes d18db7b [Geoff Anderson] fixed :rat errors (needed to add licenses) 321fdf8 [Geoff Anderson] Ignore tests/ and vagrant/ directories when running rat build task 795fc75 [Geoff Anderson] Merged in changes from upstream trunk. 1d93f06 [Geoff Anderson] Updated provisioning to use java 7 in light of KAFKA-2316 2ea4e29 [Geoff Anderson] Tweaked README, changed default log collection behavior on VerifiableProducer 0eb6fdc [Geoff Anderson] Merged in system-tests 69dd7be [Geoff Anderson] Merged in trunk 4034dd6 [Geoff Anderson] Merged in upstream trunk ede6450 [Geoff] Merge pull request #4 from confluentinc/move_muckrake 7751545 [Geoff Anderson] Corrected license headers e6d532f [Geoff Anderson] java 7 -> java 6 8c61e2d [Geoff Anderson] Reverted jdk back to 6 f14c507 [Geoff Anderson] Removed mode = "test" from Vagrantfile and Vagrantfile.local examples. Updated testing README to clarify aws setup. 98b7253 [Geoff Anderson] Updated consumer tests to pre-populate kafka logs e6a41f1 [Geoff Anderson] removed stray println b15b24f [Geoff Anderson] leftover KafkaBenchmark in super call 0f75187 [Geoff Anderson] Rmoved stray allow_fail. kafka_benchmark_test -> benchmark_test f469f84 [Geoff Anderson] Tweaked readme, added example Vagrantfile.local 3d73857 [Geoff Anderson] Merged downstream changes 42dcdb1 [Geoff Anderson] Tweaked behavior of stop_node, clean_node to generally fail fast 7f7c3e0 [Geoff Anderson] Updated setup.py for kafkatest c60125c [Geoff Anderson] TestEndToEndLatency -> EndToEndLatency 4f476fe [Geoff Anderson] Moved aws scripts to vagrant directory 5af88fc [Geoff Anderson] Updated README to include aws quickstart e5edf03 [Geoff Anderson] Updated example aws Vagrantfile.local 96533c3 [Geoff] Update aws-access-keys-commands 25a413d [Geoff] Update aws-example-Vagrantfile.local 884b20e [Geoff Anderson] Moved a bunch of files to kafkatest directory fc7c81c [Geoff Anderson] added setup.py 632be12 [Geoff] Merge pull request #3 from confluentinc/verbose-client 51a94fd [Geoff Anderson] Use argparse4j instead of joptsimple. ThroughputThrottler now has more intuitive behavior when targetThroughput is 0. a80a428 [Geoff Anderson] Added shell program for VerifiableProducer. d586fb0 [Geoff Anderson] Updated comments to reflect that throttler is not message-specific 6842ed1 [Geoff Anderson] left out a file from last commit 1228eef [Geoff Anderson] Renamed throttler 9100417 [Geoff Anderson] Updated command-line options for VerifiableProducer. Extracted throughput logic to make it reusable. 0a5de8e [Geoff Anderson] Fixed checkstyle errors. Changed name to VerifiableProducer. Added synchronization for thread safety on println statements. 475423b [Geoff Anderson] Convert class to string before adding to json object. bc009f2 [Geoff Anderson] Got rid of VerboseProducer in core (moved to clients) c0526fe [Geoff Anderson] Updates per review comments. 8b4b1f2 [Geoff Anderson] Minor updates to VerboseProducer 2777712 [Geoff Anderson] Added some metadata to producer output. da94b8c [Geoff Anderson] Added number of messages option. 07cd1c6 [Geoff Anderson] Added simple producer which prints status of produced messages to stdout. a278988 [Geoff Anderson] fixed typos f1914c3 [Liquan Pei] Merge pull request #2 from confluentinc/system_tests 81e4156 [Liquan Pei] Bootstrap Kafka system tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e43c9aff Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e43c9aff Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e43c9aff Branch: refs/heads/trunk Commit: e43c9aff92c57da6abb0c1d0af3431a550110a89 Parents: f4101ab Author: Geoff Anderson <[email protected]> Authored: Tue Jul 28 17:22:14 2015 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Jul 28 17:22:14 2015 -0700 ---------------------------------------------------------------------- .gitignore | 5 + Vagrantfile | 39 ++- bin/kafka-run-class.sh | 10 + bin/kafka-verifiable-producer.sh | 20 ++ build.gradle | 60 +++- checkstyle/import-control.xml | 2 + .../clients/tools/ProducerPerformance.java | 219 ------------- .../scala/kafka/tools/EndToEndLatency.scala | 92 ++++++ .../scala/kafka/tools/ProducerPerformance.scala | 4 +- .../scala/kafka/tools/TestEndToEndLatency.scala | 91 ------ settings.gradle | 3 +- tests/.gitignore | 11 + tests/README.md | 150 +++++++++ tests/kafkatest/__init__.py | 16 + tests/kafkatest/services/__init__.py | 15 + tests/kafkatest/services/console_consumer.py | 146 +++++++++ tests/kafkatest/services/kafka.py | 227 ++++++++++++++ tests/kafkatest/services/performance.py | 163 ++++++++++ .../templates/console_consumer.properties | 19 ++ .../services/templates/kafka.properties | 121 ++++++++ .../services/templates/zookeeper.properties | 25 ++ tests/kafkatest/services/verifiable_producer.py | 107 +++++++ tests/kafkatest/services/zookeeper.py | 64 ++++ tests/kafkatest/tests/__init__.py | 15 + tests/kafkatest/tests/benchmark_test.py | 274 +++++++++++++++++ tests/kafkatest/tests/kafka_test.py | 45 +++ tests/kafkatest/tests/replication_test.py | 165 ++++++++++ tests/setup.py | 27 ++ .../clients/tools/ProducerPerformance.java | 201 ++++++++++++ .../clients/tools/ThroughputThrottler.java | 118 +++++++ .../kafka/clients/tools/VerifiableProducer.java | 307 +++++++++++++++++++ vagrant/aws/aws-access-keys-commands | 29 ++ vagrant/aws/aws-example-Vagrantfile.local | 28 ++ vagrant/aws/aws-init.sh | 73 +++++ vagrant/base.sh | 9 + vagrant/system-test-Vagrantfile.local | 22 ++ 36 files changed, 2603 insertions(+), 319 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 1f3ba7d..4aae6e7 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,8 @@ config/server-* config/zookeeper-* core/data/* gradle/wrapper/* + +results +tests/results +.ducktape +tests/.ducktape http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/Vagrantfile ---------------------------------------------------------------------- diff --git a/Vagrantfile b/Vagrantfile index 1d7cc01..ee8b352 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -31,6 +31,7 @@ ram_megabytes = 1280 # EC2 ec2_access_key = ENV['AWS_ACCESS_KEY'] ec2_secret_key = ENV['AWS_SECRET_KEY'] +ec2_session_token = ENV['AWS_SESSION_TOKEN'] ec2_keypair_name = nil ec2_keypair_file = nil @@ -50,6 +51,24 @@ if File.exists?(local_config_file) then eval(File.read(local_config_file), binding, "Vagrantfile.local") end +# This is a horrible hack to work around bad interactions between +# vagrant-hostmanager and vagrant-aws/vagrant's implementation. Hostmanager +# wants to update the /etc/hosts entries, but tries to do so even on nodes that +# aren't up (e.g. even when all nodes are stopped and you run vagrant +# destroy). Because of the way the underlying code in vagrant works, it still +# tries to communicate with the node and has to wait for a very long +# timeout. This modifies the update to check for hosts that are not created or +# stopped, skipping the update in that case since it's impossible to update +# nodes in that state. +Object.const_get("VagrantPlugins").const_get("HostManager").const_get("HostsFile").class_eval do + alias_method :old_update_guest, :update_guest + def update_guest(machine) + state_id = machine.state.id + return if state_id == :not_created || state_id == :stopped + old_update_guest(machine) + end +end + # TODO(ksweeney): RAM requirements are not empirical and can probably be significantly lowered. Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| config.hostmanager.enabled = true @@ -85,13 +104,31 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| override.vm.box = "dummy" override.vm.box_url = "https://github.com/mitchellh/vagrant-aws/raw/master/dummy.box" - override.hostmanager.ignore_private_ip = true + cached_addresses = {} + # Use a custom resolver that SSH's into the machine and finds the IP address + # directly. This lets us get at the private IP address directly, avoiding + # some issues with using the default IP resolver, which uses the public IP + # address. + override.hostmanager.ip_resolver = proc do |vm, resolving_vm| + if !cached_addresses.has_key?(vm.name) + state_id = vm.state.id + if state_id != :not_created && state_id != :stopped && vm.communicate.ready? + vm.communicate.execute("/sbin/ifconfig eth0 | grep 'inet addr' | tail -n 1 | egrep -o '[0-9\.]+' | head -n 1 2>&1") do |type, contents| + cached_addresses[vm.name] = contents.split("\n").first[/(\d+\.\d+\.\d+\.\d+)/, 1] + end + else + cached_addresses[vm.name] = nil + end + end + cached_addresses[vm.name] + end override.ssh.username = ec2_user override.ssh.private_key_path = ec2_keypair_file aws.access_key_id = ec2_access_key aws.secret_access_key = ec2_secret_key + aws.session_token = ec2_session_token aws.keypair_name = ec2_keypair_name aws.region = ec2_region http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/bin/kafka-run-class.sh ---------------------------------------------------------------------- diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 8c3fa28..ebe7409 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -65,6 +65,16 @@ do CLASSPATH=$CLASSPATH:$file done +for file in $base_dir/tools/build/libs/kafka-tools*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + +for file in $base_dir/tools/build/dependant-libs-${SCALA_VERSION}*/*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + # classpath addition for release for file in $base_dir/libs/*.jar; do http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/bin/kafka-verifiable-producer.sh ---------------------------------------------------------------------- diff --git a/bin/kafka-verifiable-producer.sh b/bin/kafka-verifiable-producer.sh new file mode 100755 index 0000000..d0aa6c5 --- /dev/null +++ b/bin/kafka-verifiable-producer.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then + export KAFKA_HEAP_OPTS="-Xmx512M" +fi +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.clients.tools.VerifiableProducer $@ http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 9b6eb51..1b67e62 100644 --- a/build.gradle +++ b/build.gradle @@ -204,20 +204,20 @@ for ( sv in ['2_10_5', '2_11_7'] ) { } } -tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar']) { +tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar', 'tools:jar']) { } -tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar']) { } +tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar', 'tools:srcJar']) { } -tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 'docsJar_2_11_7', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar']) { } +tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 'docsJar_2_11_7', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar', 'tools:docsJar']) { } -tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7', 'clients:test', 'log4j-appender:test']) { +tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7', 'clients:test', 'log4j-appender:test', 'tools:test']) { } tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_10_5', 'releaseTarGz_2_11_7']) { } -tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives']) { +tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives', 'tools:uploadArchives']) { } project(':core') { @@ -413,6 +413,56 @@ project(':clients') { test.dependsOn('checkstyleMain', 'checkstyleTest') } +project(':tools') { + apply plugin: 'checkstyle' + archivesBaseName = "kafka-tools" + + dependencies { + compile project(':clients') + compile 'net.sourceforge.argparse4j:argparse4j:0.5.0' + compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4' + compile "$slf4jlog4j" + + testCompile 'junit:junit:4.6' + testCompile project(path: ':clients', configuration: 'archives') + } + + task testJar(type: Jar) { + classifier = 'test' + from sourceSets.test.output + } + + test { + testLogging { + events "passed", "skipped", "failed" + exceptionFormat = 'full' + } + } + + javadoc { + include "**/org/apache/kafka/tools/*" + } + + tasks.create(name: "copyDependantLibs", type: Copy) { + from (configurations.testRuntime) { + include('slf4j-log4j12*') + } + from (configurations.runtime) { + exclude('kafka-clients*') + } + into "$buildDir/dependant-libs-${scalaVersion}" + } + + jar { + dependsOn 'copyDependantLibs' + } + + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + } + test.dependsOn('checkstyleMain', 'checkstyleTest') +} + project(':log4j-appender') { apply plugin: 'checkstyle' archivesBaseName = "kafka-log4j-appender" http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 19e0659..18be1bb 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -92,6 +92,8 @@ <subpackage name="tools"> <allow pkg="org.apache.kafka.clients.producer" /> <allow pkg="org.apache.kafka.clients.consumer" /> + <allow pkg="com.fasterxml.jackson" /> + <allow pkg="net.sourceforge.argparse4j" /> </subpackage> </subpackage> http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java deleted file mode 100644 index 13f4d59..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ /dev/null @@ -1,219 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.clients.tools; - -import java.util.Arrays; -import java.util.Properties; - -import org.apache.kafka.clients.producer.*; - -public class ProducerPerformance { - - private static final long NS_PER_MS = 1000000L; - private static final long NS_PER_SEC = 1000 * NS_PER_MS; - private static final long MIN_SLEEP_NS = 2 * NS_PER_MS; - - public static void main(String[] args) throws Exception { - if (args.length < 4) { - System.err.println("USAGE: java " + ProducerPerformance.class.getName() + - " topic_name num_records record_size target_records_sec [prop_name=prop_value]*"); - System.exit(1); - } - - /* parse args */ - String topicName = args[0]; - long numRecords = Long.parseLong(args[1]); - int recordSize = Integer.parseInt(args[2]); - int throughput = Integer.parseInt(args[3]); - - Properties props = new Properties(); - for (int i = 4; i < args.length; i++) { - String[] pieces = args[i].split("="); - if (pieces.length != 2) - throw new IllegalArgumentException("Invalid property: " + args[i]); - props.put(pieces[0], pieces[1]); - } - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props); - - /* setup perf test */ - byte[] payload = new byte[recordSize]; - Arrays.fill(payload, (byte) 1); - ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload); - long sleepTime = NS_PER_SEC / throughput; - long sleepDeficitNs = 0; - Stats stats = new Stats(numRecords, 5000); - long start = System.currentTimeMillis(); - for (int i = 0; i < numRecords; i++) { - long sendStart = System.currentTimeMillis(); - Callback cb = stats.nextCompletion(sendStart, payload.length, stats); - producer.send(record, cb); - - /* - * Maybe sleep a little to control throughput. Sleep time can be a bit inaccurate for times < 1 ms so - * instead of sleeping each time instead wait until a minimum sleep time accumulates (the "sleep deficit") - * and then make up the whole deficit in one longer sleep. - */ - if (throughput > 0) { - float elapsed = (sendStart - start) / 1000.f; - if (elapsed > 0 && i / elapsed > throughput) { - sleepDeficitNs += sleepTime; - if (sleepDeficitNs >= MIN_SLEEP_NS) { - long sleepMs = sleepDeficitNs / 1000000; - long sleepNs = sleepDeficitNs - sleepMs * 1000000; - Thread.sleep(sleepMs, (int) sleepNs); - sleepDeficitNs = 0; - } - } - } - } - - /* print final results */ - producer.close(); - stats.printTotal(); - } - - private static class Stats { - private long start; - private long windowStart; - private int[] latencies; - private int sampling; - private int iteration; - private int index; - private long count; - private long bytes; - private int maxLatency; - private long totalLatency; - private long windowCount; - private int windowMaxLatency; - private long windowTotalLatency; - private long windowBytes; - private long reportingInterval; - - public Stats(long numRecords, int reportingInterval) { - this.start = System.currentTimeMillis(); - this.windowStart = System.currentTimeMillis(); - this.index = 0; - this.iteration = 0; - this.sampling = (int) (numRecords / Math.min(numRecords, 500000)); - this.latencies = new int[(int) (numRecords / this.sampling) + 1]; - this.index = 0; - this.maxLatency = 0; - this.totalLatency = 0; - this.windowCount = 0; - this.windowMaxLatency = 0; - this.windowTotalLatency = 0; - this.windowBytes = 0; - this.totalLatency = 0; - this.reportingInterval = reportingInterval; - } - - public void record(int iter, int latency, int bytes, long time) { - this.count++; - this.bytes += bytes; - this.totalLatency += latency; - this.maxLatency = Math.max(this.maxLatency, latency); - this.windowCount++; - this.windowBytes += bytes; - this.windowTotalLatency += latency; - this.windowMaxLatency = Math.max(windowMaxLatency, latency); - if (iter % this.sampling == 0) { - this.latencies[index] = latency; - this.index++; - } - /* maybe report the recent perf */ - if (time - windowStart >= reportingInterval) { - printWindow(); - newWindow(); - } - } - - public Callback nextCompletion(long start, int bytes, Stats stats) { - Callback cb = new PerfCallback(this.iteration, start, bytes, stats); - this.iteration++; - return cb; - } - - public void printWindow() { - long ellapsed = System.currentTimeMillis() - windowStart; - double recsPerSec = 1000.0 * windowCount / (double) ellapsed; - double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0); - System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f max latency.\n", - windowCount, - recsPerSec, - mbPerSec, - windowTotalLatency / (double) windowCount, - (double) windowMaxLatency); - } - - public void newWindow() { - this.windowStart = System.currentTimeMillis(); - this.windowCount = 0; - this.windowMaxLatency = 0; - this.windowTotalLatency = 0; - this.windowBytes = 0; - } - - public void printTotal() { - long ellapsed = System.currentTimeMillis() - start; - double recsPerSec = 1000.0 * count / (double) ellapsed; - double mbPerSec = 1000.0 * this.bytes / (double) ellapsed / (1024.0 * 1024.0); - int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); - System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n", - count, - recsPerSec, - mbPerSec, - totalLatency / (double) count, - (double) maxLatency, - percs[0], - percs[1], - percs[2], - percs[3]); - } - - private static int[] percentiles(int[] latencies, int count, double... percentiles) { - int size = Math.min(count, latencies.length); - Arrays.sort(latencies, 0, size); - int[] values = new int[percentiles.length]; - for (int i = 0; i < percentiles.length; i++) { - int index = (int) (percentiles[i] * size); - values[i] = latencies[index]; - } - return values; - } - } - - private static final class PerfCallback implements Callback { - private final long start; - private final int iteration; - private final int bytes; - private final Stats stats; - - public PerfCallback(int iter, long start, int bytes, Stats stats) { - this.start = start; - this.stats = stats; - this.iteration = iter; - this.bytes = bytes; - } - - public void onCompletion(RecordMetadata metadata, Exception exception) { - long now = System.currentTimeMillis(); - int latency = (int) (now - start); - this.stats.record(iteration, latency, bytes, now); - if (exception != null) - exception.printStackTrace(); - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/core/src/main/scala/kafka/tools/EndToEndLatency.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala new file mode 100755 index 0000000..7bb69b7 --- /dev/null +++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.util.{Arrays, Properties} + +import kafka.consumer._ +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} + +import scala.Option.option2Iterable + +object EndToEndLatency { + def main(args: Array[String]) { + if (args.length != 6) { + System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks") + System.exit(1) + } + + val brokerList = args(0) + val zkConnect = args(1) + val topic = args(2) + val numMessages = args(3).toInt + val consumerFetchMaxWait = args(4).toInt + val producerAcks = args(5).toInt + + val consumerProps = new Properties() + consumerProps.put("group.id", topic) + consumerProps.put("auto.commit.enable", "false") + consumerProps.put("auto.offset.reset", "largest") + consumerProps.put("zookeeper.connect", zkConnect) + consumerProps.put("fetch.wait.max.ms", consumerFetchMaxWait.toString) + consumerProps.put("socket.timeout.ms", 1201000.toString) + + val config = new ConsumerConfig(consumerProps) + val connector = Consumer.create(config) + val stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head + val iter = stream.iterator + + val producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") + producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") + producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString) + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) + + // make sure the consumer fetcher has started before sending data since otherwise + // the consumption from the tail will skip the first message and hence be blocked + Thread.sleep(5000) + + val message = "hello there beautiful".getBytes + var totalTime = 0.0 + val latencies = new Array[Long](numMessages) + for (i <- 0 until numMessages) { + val begin = System.nanoTime + producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message)) + val received = iter.next + val elapsed = System.nanoTime - begin + // poor man's progress bar + if (i % 1000 == 0) + println(i + "\t" + elapsed / 1000.0 / 1000.0) + totalTime += elapsed + latencies(i) = (elapsed / 1000 / 1000) + } + println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0)) + Arrays.sort(latencies) + val p50 = latencies((latencies.length * 0.5).toInt) + val p99 = latencies((latencies.length * 0.99).toInt) + val p999 = latencies((latencies.length * 0.999).toInt) + println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999)) + producer.close() + connector.commitOffsets(true) + connector.shutdown() + System.exit(0) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/core/src/main/scala/kafka/tools/ProducerPerformance.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index 0ebfa59..0335cc6 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -115,9 +115,9 @@ object ProducerPerformance extends Logging { .defaultsTo(0) val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + - "set, the csv metrics will be outputed here") + "set, the csv metrics will be output here") .withRequiredArg - .describedAs("metrics dictory") + .describedAs("metrics directory") .ofType(classOf[java.lang.String]) val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/core/src/test/scala/kafka/tools/TestEndToEndLatency.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/test/scala/kafka/tools/TestEndToEndLatency.scala deleted file mode 100755 index 99b77a1..0000000 --- a/core/src/test/scala/kafka/tools/TestEndToEndLatency.scala +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer} -import kafka.consumer._ -import java.util.Properties -import java.util.Arrays -import scala.Option.option2Iterable - -object TestEndToEndLatency { - def main(args: Array[String]) { - if (args.length != 6) { - System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks") - System.exit(1) - } - - val brokerList = args(0) - val zkConnect = args(1) - val topic = args(2) - val numMessages = args(3).toInt - val consumerFetchMaxWait = args(4).toInt - val producerAcks = args(5).toInt - - val consumerProps = new Properties() - consumerProps.put("group.id", topic) - consumerProps.put("auto.commit.enable", "false") - consumerProps.put("auto.offset.reset", "largest") - consumerProps.put("zookeeper.connect", zkConnect) - consumerProps.put("fetch.wait.max.ms", consumerFetchMaxWait.toString) - consumerProps.put("socket.timeout.ms", 1201000.toString) - - val config = new ConsumerConfig(consumerProps) - val connector = Consumer.create(config) - val stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head - val iter = stream.iterator - - val producerProps = new Properties() - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") - producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") - producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString) - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) - - // make sure the consumer fetcher has started before sending data since otherwise - // the consumption from the tail will skip the first message and hence be blocked - Thread.sleep(5000) - - val message = "hello there beautiful".getBytes - var totalTime = 0.0 - val latencies = new Array[Long](numMessages) - for (i <- 0 until numMessages) { - val begin = System.nanoTime - producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message)) - val received = iter.next - val elapsed = System.nanoTime - begin - // poor man's progress bar - if (i % 1000 == 0) - println(i + "\t" + elapsed / 1000.0 / 1000.0) - totalTime += elapsed - latencies(i) = (elapsed / 1000 / 1000) - } - println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0)) - Arrays.sort(latencies) - val p50 = latencies((latencies.length * 0.5).toInt) - val p99 = latencies((latencies.length * 0.99).toInt) - val p999 = latencies((latencies.length * 0.999).toInt) - println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999)) - producer.close() - connector.commitOffsets(true) - connector.shutdown() - System.exit(0) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index 3b6a952..1944917 100644 --- a/settings.gradle +++ b/settings.gradle @@ -14,4 +14,5 @@ // limitations under the License. apply from: file('scala.gradle') -include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'log4j-appender' +include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'log4j-appender' + http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/.gitignore ---------------------------------------------------------------------- diff --git a/tests/.gitignore b/tests/.gitignore new file mode 100644 index 0000000..b218b83 --- /dev/null +++ b/tests/.gitignore @@ -0,0 +1,11 @@ +Vagrantfile.local + +.idea/ + +*.pyc +*.ipynb + +.DS_Store + +.ducktape +results/ http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/README.md ---------------------------------------------------------------------- diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..ffbc0d5 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,150 @@ +System Integration & Performance Testing +======================================== + +This directory contains Kafka system integration and performance tests. +[ducktape](https://github.com/confluentinc/ducktape) is used to run the tests. +(ducktape is a distributed testing framework which provides test runner, +result reporter and utilities to pull up and tear down services.) + +Local Quickstart +---------------- +This quickstart will help you run the Kafka system tests on your local machine. + +* Install Virtual Box from [https://www.virtualbox.org/](https://www.virtualbox.org/) (run `$ vboxmanage --version` to check if it's installed). +* Install Vagrant >= 1.6.4 from [http://www.vagrantup.com/](http://www.vagrantup.com/) (run `vagrant --version` to check if it's installed). +* Install Vagrant Plugins: + + # Required + $ vagrant plugin install vagrant-hostmanager vagrant-cachier + +* Build a specific branch of Kafka + + $ cd kafka + $ git checkout $BRANCH + $ gradle + $ ./gradlew jar + +* Setup a testing cluster with Vagrant. Configure your Vagrant setup by creating the file + `Vagrantfile.local` in the directory of your Kafka checkout. For testing purposes, + `num_brokers` and `num_kafka` should be 0, and `num_workers` should be set high enough + to run all of you tests. An example resides in kafka/vagrant/system-test-Vagrantfile.local + + # Example Vagrantfile.local for use on local machine + # Vagrantfile.local should reside in the base Kafka directory + num_zookeepers = 0 + num_kafka = 0 + num_workers = 9 + +* Bring up the cluster (note that the initial provisioning process can be slow since it involves +installing dependencies and updates on every vm.): + + $ vagrant up + +* Install ducktape: + + $ pip install ducktape + +* Run the system tests using ducktape: + + $ cd tests + $ ducktape kafkatest/tests + +* If you make changes to your Kafka checkout, you'll need to rebuild and resync to your Vagrant cluster: + + $ cd kafka + $ ./gradlew jar + $ vagrant rsync # Re-syncs build output to cluster + +EC2 Quickstart +-------------- +This quickstart will help you run the Kafka system tests on EC2. In this setup, all logic is run +on EC2 and none on your local machine. + +There are a lot of steps here, but the basic goals are to create one distinguished EC2 instance that +will be our "test driver", and to set up the security groups and iam role so that the test driver +can create, destroy, and run ssh commands on any number of "workers". + +As a convention, we'll use "kafkatest" in most names, but you can use whatever name you want. + +Preparation +----------- +In these steps, we will create an IAM role which has permission to create and destroy EC2 instances, +set up a keypair used for ssh access to the test driver and worker machines, and create a security group to allow the test driver and workers to all communicate via TCP. + +* [Create an IAM role](http://docs.aws.amazon.com/IAM/latest/UserGuide/Using_SettingUpUser.html#Using_CreateUser_console). We'll give this role the ability to launch or kill additional EC2 machines. + - Create role "kafkatest-master" + - Role type: Amazon EC2 + - Attach policy: AmazonEC2FullAccess (this will allow our test-driver to create and destroy EC2 instances) + +* If you haven't already, [set up a keypair to use for SSH access](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html). For the purpose +of this quickstart, let's say the keypair name is kafkatest, and you've saved the private key in kafktest.pem + +* Next, create a security group called "kafkatest". + - After creating the group, inbound rules: allow SSH on port 22 from anywhere; also, allow access on all ports (0-65535) from other machines in the kafkatest group. + +Create the Test Driver +---------------------- +* Launch a new test driver machine + - OS: Ubuntu server is recommended + - Instance type: t2.medium is easily enough since this machine is just a driver + - Instance details: Most defaults are fine. + - IAM role -> kafkatest-master + - Tagging the instance with a useful name is recommended. + - Security group -> 'kafkatest' + + +* Once the machine is started, upload the SSH key to your test driver: + + $ scp -i /path/to/kafkatest.pem \ + /path/to/kafkatest.pem [email protected]:kafkatest.pem + +* Grab the public hostname/IP (available for example by navigating to your EC2 dashboard and viewing running instances) of your test driver and SSH into it: + + $ ssh -i /path/to/kafkatest.pem [email protected] + +Set Up the Test Driver +---------------------- +The following steps assume you have ssh'd into +the test driver machine. + +* Start by making sure you're up to date, and install git and ducktape: + + $ sudo apt-get update && sudo apt-get -y upgrade && sudo apt-get install -y git + $ pip install ducktape + +* Get Kafka: + + $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka + +* Install some dependencies: + + $ cd kafka + $ kafka/vagrant/aws/aws-init.sh + $ . ~/.bashrc + +* An example Vagrantfile.local has been created by aws-init.sh which looks something like: + + # Vagrantfile.local + ec2_instance_type = "..." # Pick something appropriate for your + # test. Note that the default m3.medium has + # a small disk. + num_zookeepers = 0 + num_kafka = 0 + num_workers = 9 + ec2_keypair_name = 'kafkatest' + ec2_keypair_file = '/home/ubuntu/kafkatest.pem' + ec2_security_groups = ['kafkatest'] + ec2_region = 'us-west-2' + ec2_ami = "ami-29ebb519" + +* Start up the instances (note we have found bringing up machines in parallel can cause errors on aws): + + $ vagrant up --provider=aws --no-provision --no-parallel && vagrant provision + +* Now you should be able to run tests: + + $ cd kafka/tests + $ ducktape kafkatest/tests + +* To halt your workers without destroying persistent state, run `vagrant halt`. Run `vagrant destroy -f` to destroy all traces of your workers. + http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/__init__.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py new file mode 100644 index 0000000..28d269b --- /dev/null +++ b/tests/kafkatest/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/__init__.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/__init__.py b/tests/kafkatest/services/__init__.py new file mode 100644 index 0000000..ebc9bb3 --- /dev/null +++ b/tests/kafkatest/services/__init__.py @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/console_consumer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py new file mode 100644 index 0000000..33ef4ea --- /dev/null +++ b/tests/kafkatest/services/console_consumer.py @@ -0,0 +1,146 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.background_thread import BackgroundThreadService + + +def is_int(msg): + """Default method used to check whether text pulled from console consumer is a message. + + return int or None + """ + try: + return int(msg) + except: + return None + + +""" +0.8.2.1 ConsoleConsumer options + +The console consumer is a tool that reads data from Kafka and outputs it to standard output. +Option Description +------ ----------- +--blacklist <blacklist> Blacklist of topics to exclude from + consumption. +--consumer.config <config file> Consumer config properties file. +--csv-reporter-enabled If set, the CSV metrics reporter will + be enabled +--delete-consumer-offsets If specified, the consumer path in + zookeeper is deleted when starting up +--formatter <class> The name of a class to use for + formatting kafka messages for + display. (default: kafka.tools. + DefaultMessageFormatter) +--from-beginning If the consumer does not already have + an established offset to consume + from, start with the earliest + message present in the log rather + than the latest message. +--max-messages <Integer: num_messages> The maximum number of messages to + consume before exiting. If not set, + consumption is continual. +--metrics-dir <metrics dictory> If csv-reporter-enable is set, and + this parameter isset, the csv + metrics will be outputed here +--property <prop> +--skip-message-on-error If there is an error when processing a + message, skip it instead of halt. +--topic <topic> The topic id to consume on. +--whitelist <whitelist> Whitelist of topics to include for + consumption. +--zookeeper <urls> REQUIRED: The connection string for + the zookeeper connection in the form + host:port. Multiple URLS can be + given to allow fail-over. +""" + + +class ConsoleConsumer(BackgroundThreadService): + logs = { + "consumer_log": { + "path": "/mnt/consumer.log", + "collect_default": True} + } + + def __init__(self, context, num_nodes, kafka, topic, message_validator=is_int, from_beginning=True, consumer_timeout_ms=None): + """ + Args: + context: standard context + num_nodes: number of nodes to use (this should be 1) + kafka: kafka service + topic: consume from this topic + message_validator: function which returns message or None + from_beginning: consume from beginning if True, else from the end + consumer_timeout_ms: corresponds to consumer.timeout.ms. consumer process ends if time between + successively consumed messages exceeds this timeout. Setting this and + waiting for the consumer to stop is a pretty good way to consume all messages + in a topic. + """ + super(ConsoleConsumer, self).__init__(context, num_nodes) + self.kafka = kafka + self.args = { + 'topic': topic, + } + + self.consumer_timeout_ms = consumer_timeout_ms + + self.from_beginning = from_beginning + self.message_validator = message_validator + self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)} + + @property + def start_cmd(self): + args = self.args.copy() + args.update({'zk_connect': self.kafka.zk.connect_setting()}) + cmd = "/opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s --zookeeper %(zk_connect)s" \ + " --consumer.config /mnt/console_consumer.properties" % args + + if self.from_beginning: + cmd += " --from-beginning" + + cmd += " 2>> /mnt/consumer.log | tee -a /mnt/consumer.log &" + return cmd + + def _worker(self, idx, node): + # form config file + if self.consumer_timeout_ms is not None: + prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms) + else: + prop_file = self.render('console_consumer.properties') + + self.logger.info("console_consumer.properties:") + self.logger.info(prop_file) + node.account.create_file("/mnt/console_consumer.properties", prop_file) + + # Run and capture output + cmd = self.start_cmd + self.logger.debug("Console consumer %d command: %s", idx, cmd) + for line in node.account.ssh_capture(cmd): + msg = line.strip() + msg = self.message_validator(msg) + if msg is not None: + self.logger.debug("consumed a message: " + str(msg)) + self.messages_consumed[idx].append(msg) + + def start_node(self, node): + super(ConsoleConsumer, self).start_node(node) + + def stop_node(self, node): + node.account.kill_process("java", allow_fail=False) + + def clean_node(self, node): + node.account.ssh("rm -rf /mnt/console_consumer.properties /mnt/consumer.log", allow_fail=False) + http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/kafka.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka.py b/tests/kafkatest/services/kafka.py new file mode 100644 index 0000000..34ec5ef --- /dev/null +++ b/tests/kafkatest/services/kafka.py @@ -0,0 +1,227 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.service import Service + +import json +import re +import signal +import time + + +class KafkaService(Service): + + logs = { + "kafka_log": { + "path": "/mnt/kafka.log", + "collect_default": True}, + "kafka_data": { + "path": "/mnt/kafka-logs", + "collect_default": False} + } + + def __init__(self, context, num_nodes, zk, topics=None): + """ + :type context + :type zk: ZookeeperService + :type topics: dict + """ + super(KafkaService, self).__init__(context, num_nodes) + self.zk = zk + self.topics = topics + + def start(self): + super(KafkaService, self).start() + + # Create topics if necessary + if self.topics is not None: + for topic, topic_cfg in self.topics.items(): + if topic_cfg is None: + topic_cfg = {} + + topic_cfg["topic"] = topic + self.create_topic(topic_cfg) + + def start_node(self, node): + props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node)) + self.logger.info("kafka.properties:") + self.logger.info(props_file) + node.account.create_file("/mnt/kafka.properties", props_file) + + cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid" + self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd)) + node.account.ssh(cmd) + time.sleep(5) + if len(self.pids(node)) == 0: + raise Exception("No process ids recorded on node %s" % str(node)) + + def pids(self, node): + """Return process ids associated with running processes on the given node.""" + try: + return [pid for pid in node.account.ssh_capture("cat /mnt/kafka.pid", callback=int)] + except: + return [] + + def signal_node(self, node, sig=signal.SIGTERM): + pids = self.pids(node) + for pid in pids: + node.account.signal(pid, sig) + + def signal_leader(self, topic, partition=0, sig=signal.SIGTERM): + leader = self.leader(topic, partition) + self.signal_node(leader, sig) + + def stop_node(self, node, clean_shutdown=True): + pids = self.pids(node) + sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL + + for pid in pids: + node.account.signal(pid, sig, allow_fail=False) + + node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False) + + def clean_node(self, node): + node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False) + + def create_topic(self, topic_cfg): + node = self.nodes[0] # any node is fine here + self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg) + + cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %(zk_connect)s --create "\ + "--topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % { + 'zk_connect': self.zk.connect_setting(), + 'topic': topic_cfg.get("topic"), + 'partitions': topic_cfg.get('partitions', 1), + 'replication': topic_cfg.get('replication-factor', 1) + } + + if "configs" in topic_cfg.keys() and topic_cfg["configs"] is not None: + for config_name, config_value in topic_cfg["configs"].items(): + cmd += " --config %s=%s" % (config_name, str(config_value)) + + self.logger.info("Running topic creation command...\n%s" % cmd) + node.account.ssh(cmd) + + time.sleep(1) + self.logger.info("Checking to see if topic was properly created...\n%s" % cmd) + for line in self.describe_topic(topic_cfg["topic"]).split("\n"): + self.logger.info(line) + + def describe_topic(self, topic): + node = self.nodes[0] + cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \ + (self.zk.connect_setting(), topic) + output = "" + for line in node.account.ssh_capture(cmd): + output += line + return output + + def verify_reassign_partitions(self, reassignment): + """Run the reassign partitions admin tool in "verify" mode + """ + node = self.nodes[0] + json_file = "/tmp/" + str(time.time()) + "_reassign.json" + + # reassignment to json + json_str = json.dumps(reassignment) + json_str = json.dumps(json_str) + + # create command + cmd = "echo %s > %s && " % (json_str, json_file) + cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\ + "--zookeeper %(zk_connect)s "\ + "--reassignment-json-file %(reassignment_file)s "\ + "--verify" % {'zk_connect': self.zk.connect_setting(), + 'reassignment_file': json_file} + cmd += " && sleep 1 && rm -f %s" % json_file + + # send command + self.logger.info("Verifying parition reassignment...") + self.logger.debug(cmd) + output = "" + for line in node.account.ssh_capture(cmd): + output += line + + self.logger.debug(output) + + if re.match(".*is in progress.*", output) is not None: + return False + + return True + + def execute_reassign_partitions(self, reassignment): + """Run the reassign partitions admin tool in "verify" mode + """ + node = self.nodes[0] + json_file = "/tmp/" + str(time.time()) + "_reassign.json" + + # reassignment to json + json_str = json.dumps(reassignment) + json_str = json.dumps(json_str) + + # create command + cmd = "echo %s > %s && " % (json_str, json_file) + cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\ + "--zookeeper %(zk_connect)s "\ + "--reassignment-json-file %(reassignment_file)s "\ + "--execute" % {'zk_connect': self.zk.connect_setting(), + 'reassignment_file': json_file} + cmd += " && sleep 1 && rm -f %s" % json_file + + # send command + self.logger.info("Executing parition reassignment...") + self.logger.debug(cmd) + output = "" + for line in node.account.ssh_capture(cmd): + output += line + + self.logger.debug("Verify partition reassignment:") + self.logger.debug(output) + + def restart_node(self, node, wait_sec=0, clean_shutdown=True): + """Restart the given node, waiting wait_sec in between stopping and starting up again.""" + self.stop_node(node, clean_shutdown) + time.sleep(wait_sec) + self.start_node(node) + + def leader(self, topic, partition=0): + """ Get the leader replica for the given topic and partition. + """ + cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " \ + % self.zk.connect_setting() + cmd += "get /brokers/topics/%s/partitions/%d/state" % (topic, partition) + self.logger.debug(cmd) + + node = self.nodes[0] + self.logger.debug("Querying zookeeper to find leader replica for topic %s: \n%s" % (cmd, topic)) + partition_state = None + for line in node.account.ssh_capture(cmd): + match = re.match("^({.+})$", line) + if match is not None: + partition_state = match.groups()[0] + break + + if partition_state is None: + raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition)) + + partition_state = json.loads(partition_state) + self.logger.info(partition_state) + + leader_idx = int(partition_state["leader"]) + self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx)) + return self.get_node(leader_idx) + + def bootstrap_servers(self): + return ','.join([node.account.hostname + ":9092" for node in self.nodes]) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/performance.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/performance.py b/tests/kafkatest/services/performance.py new file mode 100644 index 0000000..65c1a4d --- /dev/null +++ b/tests/kafkatest/services/performance.py @@ -0,0 +1,163 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.background_thread import BackgroundThreadService + + +class PerformanceService(BackgroundThreadService): + def __init__(self, context, num_nodes): + super(PerformanceService, self).__init__(context, num_nodes) + self.results = [None] * self.num_nodes + self.stats = [[] for x in range(self.num_nodes)] + + +class ProducerPerformanceService(PerformanceService): + def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False): + super(ProducerPerformanceService, self).__init__(context, num_nodes) + self.kafka = kafka + self.args = { + 'topic': topic, + 'num_records': num_records, + 'record_size': record_size, + 'throughput': throughput + } + self.settings = settings + self.intermediate_stats = intermediate_stats + + def _worker(self, idx, node): + args = self.args.copy() + args.update({'bootstrap_servers': self.kafka.bootstrap_servers()}) + cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\ + "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args + + for key,value in self.settings.items(): + cmd += " %s=%s" % (str(key), str(value)) + self.logger.debug("Producer performance %d command: %s", idx, cmd) + + def parse_stats(line): + parts = line.split(',') + return { + 'records': int(parts[0].split()[0]), + 'records_per_sec': float(parts[1].split()[0]), + 'mbps': float(parts[1].split('(')[1].split()[0]), + 'latency_avg_ms': float(parts[2].split()[0]), + 'latency_max_ms': float(parts[3].split()[0]), + 'latency_50th_ms': float(parts[4].split()[0]), + 'latency_95th_ms': float(parts[5].split()[0]), + 'latency_99th_ms': float(parts[6].split()[0]), + 'latency_999th_ms': float(parts[7].split()[0]), + } + last = None + for line in node.account.ssh_capture(cmd): + self.logger.debug("Producer performance %d: %s", idx, line.strip()) + if self.intermediate_stats: + try: + self.stats[idx-1].append(parse_stats(line)) + except: + # Sometimes there are extraneous log messages + pass + last = line + try: + self.results[idx-1] = parse_stats(last) + except: + self.logger.error("Bad last line: %s", last) + + +class ConsumerPerformanceService(PerformanceService): + def __init__(self, context, num_nodes, kafka, topic, num_records, throughput, threads=1, settings={}): + super(ConsumerPerformanceService, self).__init__(context, num_nodes) + self.kafka = kafka + self.args = { + 'topic': topic, + 'num_records': num_records, + 'throughput': throughput, + 'threads': threads, + } + self.settings = settings + + def _worker(self, idx, node): + args = self.args.copy() + args.update({'zk_connect': self.kafka.zk.connect_setting()}) + cmd = "/opt/kafka/bin/kafka-consumer-perf-test.sh "\ + "--topic %(topic)s --messages %(num_records)d --zookeeper %(zk_connect)s" % args + for key,value in self.settings.items(): + cmd += " %s=%s" % (str(key), str(value)) + self.logger.debug("Consumer performance %d command: %s", idx, cmd) + last = None + for line in node.account.ssh_capture(cmd): + self.logger.debug("Consumer performance %d: %s", idx, line.strip()) + last = line + # Parse and save the last line's information + parts = last.split(',') + + self.results[idx-1] = { + 'total_mb': float(parts[2]), + 'mbps': float(parts[3]), + 'records_per_sec': float(parts[5]), + } + + +class EndToEndLatencyService(PerformanceService): + def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1): + super(EndToEndLatencyService, self).__init__(context, num_nodes) + self.kafka = kafka + self.args = { + 'topic': topic, + 'num_records': num_records, + 'consumer_fetch_max_wait': consumer_fetch_max_wait, + 'acks': acks + } + + def _worker(self, idx, node): + args = self.args.copy() + args.update({ + 'zk_connect': self.kafka.zk.connect_setting(), + 'bootstrap_servers': self.kafka.bootstrap_servers(), + }) + cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\ + "%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d "\ + "%(consumer_fetch_max_wait)d %(acks)d" % args + self.logger.debug("End-to-end latency %d command: %s", idx, cmd) + results = {} + for line in node.account.ssh_capture(cmd): + self.logger.debug("End-to-end latency %d: %s", idx, line.strip()) + if line.startswith("Avg latency:"): + results['latency_avg_ms'] = float(line.split()[2]) + if line.startswith("Percentiles"): + results['latency_50th_ms'] = float(line.split()[3][:-1]) + results['latency_99th_ms'] = float(line.split()[6][:-1]) + results['latency_999th_ms'] = float(line.split()[9]) + self.results[idx-1] = results + + +def parse_performance_output(summary): + parts = summary.split(',') + results = { + 'records': int(parts[0].split()[0]), + 'records_per_sec': float(parts[1].split()[0]), + 'mbps': float(parts[1].split('(')[1].split()[0]), + 'latency_avg_ms': float(parts[2].split()[0]), + 'latency_max_ms': float(parts[3].split()[0]), + 'latency_50th_ms': float(parts[4].split()[0]), + 'latency_95th_ms': float(parts[5].split()[0]), + 'latency_99th_ms': float(parts[6].split()[0]), + 'latency_999th_ms': float(parts[7].split()[0]), + } + # To provide compatibility with ConsumerPerformanceService + results['total_mb'] = results['mbps'] * (results['records'] / results['records_per_sec']) + results['rate_mbps'] = results['mbps'] + results['rate_mps'] = results['records_per_sec'] + + return results http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/templates/console_consumer.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/templates/console_consumer.properties b/tests/kafkatest/services/templates/console_consumer.properties new file mode 100644 index 0000000..63782fc --- /dev/null +++ b/tests/kafkatest/services/templates/console_consumer.properties @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +{% if consumer_timeout_ms is defined %} +consumer.timeout.ms={{ consumer_timeout_ms }} +{% endif %} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/templates/kafka.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/templates/kafka.properties b/tests/kafkatest/services/templates/kafka.properties new file mode 100644 index 0000000..db1077a --- /dev/null +++ b/tests/kafkatest/services/templates/kafka.properties @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id={{ broker_id }} + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9092 + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +#host.name=localhost + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +advertised.host.name={{ node.account.hostname }} + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port=<port accessible by clients> + +# The number of threads handling network requests +num.network.threads=3 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=65536 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs=/mnt/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=1 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect={{ zk.connect_setting() }} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=2000 http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/templates/zookeeper.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/templates/zookeeper.properties b/tests/kafkatest/services/templates/zookeeper.properties new file mode 100644 index 0000000..e66c53f --- /dev/null +++ b/tests/kafkatest/services/templates/zookeeper.properties @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +dataDir=/mnt/zookeeper +clientPort=2181 +maxClientCnxns=0 +initLimit=5 +syncLimit=2 +quorumListenOnAllIPs=true +{% for node in nodes %} +server.{{ loop.index }}={{ node.account.hostname }}:2888:3888 +{% endfor %} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/verifiable_producer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py new file mode 100644 index 0000000..cca8227 --- /dev/null +++ b/tests/kafkatest/services/verifiable_producer.py @@ -0,0 +1,107 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.background_thread import BackgroundThreadService + +import json + + +class VerifiableProducer(BackgroundThreadService): + + logs = { + "producer_log": { + "path": "/mnt/producer.log", + "collect_default": False} + } + + def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000): + super(VerifiableProducer, self).__init__(context, num_nodes) + + self.kafka = kafka + self.topic = topic + self.max_messages = max_messages + self.throughput = throughput + + self.acked_values = [] + self.not_acked_values = [] + + def _worker(self, idx, node): + cmd = self.start_cmd + self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd)) + + for line in node.account.ssh_capture(cmd): + line = line.strip() + + data = self.try_parse_json(line) + if data is not None: + + with self.lock: + if data["name"] == "producer_send_error": + data["node"] = idx + self.not_acked_values.append(int(data["value"])) + + elif data["name"] == "producer_send_success": + self.acked_values.append(int(data["value"])) + + @property + def start_cmd(self): + cmd = "/opt/kafka/bin/kafka-verifiable-producer.sh" \ + " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers()) + if self.max_messages > 0: + cmd += " --max-messages %s" % str(self.max_messages) + if self.throughput > 0: + cmd += " --throughput %s" % str(self.throughput) + + cmd += " 2>> /mnt/producer.log | tee -a /mnt/producer.log &" + return cmd + + @property + def acked(self): + with self.lock: + return self.acked_values + + @property + def not_acked(self): + with self.lock: + return self.not_acked_values + + @property + def num_acked(self): + with self.lock: + return len(self.acked_values) + + @property + def num_not_acked(self): + with self.lock: + return len(self.not_acked_values) + + def stop_node(self, node): + node.account.kill_process("VerifiableProducer", allow_fail=False) + # block until the corresponding thread exits + if len(self.worker_threads) >= self.idx(node): + # Need to guard this because stop is preemptively called before the worker threads are added and started + self.worker_threads[self.idx(node) - 1].join() + + def clean_node(self, node): + node.account.ssh("rm -rf /mnt/producer.log", allow_fail=False) + + def try_parse_json(self, string): + """Try to parse a string as json. Return None if not parseable.""" + try: + record = json.loads(string) + return record + except ValueError: + self.logger.debug("Could not parse as json: %s" % str(string)) + return None http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/services/zookeeper.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py new file mode 100644 index 0000000..56f4606 --- /dev/null +++ b/tests/kafkatest/services/zookeeper.py @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ducktape.services.service import Service + +import time + + +class ZookeeperService(Service): + + logs = { + "zk_log": { + "path": "/mnt/zk.log", + "collect_default": True} + } + + def __init__(self, context, num_nodes): + """ + :type context + """ + super(ZookeeperService, self).__init__(context, num_nodes) + + def start_node(self, node): + idx = self.idx(node) + self.logger.info("Starting ZK node %d on %s", idx, node.account.hostname) + + node.account.ssh("mkdir -p /mnt/zookeeper") + node.account.ssh("echo %d > /mnt/zookeeper/myid" % idx) + + config_file = self.render('zookeeper.properties') + self.logger.info("zookeeper.properties:") + self.logger.info(config_file) + node.account.create_file("/mnt/zookeeper.properties", config_file) + + node.account.ssh( + "/opt/kafka/bin/zookeeper-server-start.sh /mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &" + % self.logs["zk_log"]) + + time.sleep(5) # give it some time to start + + def stop_node(self, node): + idx = self.idx(node) + self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname)) + node.account.kill_process("zookeeper", allow_fail=False) + + def clean_node(self, node): + self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname) + node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=False) + + def connect_setting(self): + return ','.join([node.account.hostname + ':2181' for node in self.nodes]) http://git-wip-us.apache.org/repos/asf/kafka/blob/e43c9aff/tests/kafkatest/tests/__init__.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/__init__.py b/tests/kafkatest/tests/__init__.py new file mode 100644 index 0000000..ebc9bb3 --- /dev/null +++ b/tests/kafkatest/tests/__init__.py @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults
