Repository: kafka Updated Branches: refs/heads/0.10.0 4726adba5 -> 9ef243310
MINOR: a simple benchmark for Streams guozhangwang miguno Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1164 from ymatsuda/perf (cherry picked from commit 2788f2dc73dd62c35c87842d02d23a0ed6a62e97) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9ef24331 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9ef24331 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9ef24331 Branch: refs/heads/0.10.0 Commit: 9ef243310d6d16a5d508286f8a1c5b5f70d87680 Parents: 4726adb Author: Yasuhiro Matsuda <[email protected]> Authored: Wed Mar 30 14:26:01 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Mar 30 14:26:14 2016 -0700 ---------------------------------------------------------------------- .../streams/processor/internals/StreamTask.java | 4 +- .../kafka/streams/perf/SimpleBenchmark.java | 413 +++++++++++++++++++ 2 files changed, 415 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9ef24331/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index c4cc2ba..afa303c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -167,11 +167,11 @@ public class StreamTask extends AbstractTask implements Punctuator { this.currNode = recordInfo.node(); TopicPartition partition = recordInfo.partition(); - log.debug("Start processing one record [" + currRecord + "]"); + log.debug("Start processing one record [{}]", currRecord); this.currNode.process(currRecord.key(), currRecord.value()); - log.debug("Completed processing one record [" + currRecord + "]"); + log.debug("Completed processing one record [{}]", currRecord); // update the consumed offset map after processing is done consumedOffsets.put(partition, currRecord.offset()); http://git-wip-us.apache.org/repos/asf/kafka/blob/9ef24331/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java new file mode 100644 index 0000000..c883090 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -0,0 +1,413 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.perf; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.Properties; + +public class SimpleBenchmark { + + private final String kafka; + private final String zookeeper; + private final File stateDir; + + private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic"; + private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic"; + + private static final long NUM_RECORDS = 10000000L; + private static final Long END_KEY = NUM_RECORDS - 1; + private static final int KEY_SIZE = 8; + private static final int VALUE_SIZE = 100; + private static final int RECORD_SIZE = KEY_SIZE + VALUE_SIZE; + + private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray(); + private static final Serde<Long> LONG_SERDE = Serdes.Long(); + + public SimpleBenchmark(File stateDir, String kafka, String zookeeper) { + super(); + this.stateDir = stateDir; + this.kafka = kafka; + this.zookeeper = zookeeper; + } + + public static void main(String[] args) throws Exception { + final File stateDir = new File("/tmp/kafka-streams-simple-benchmark"); + stateDir.mkdir(); + + final File rocksdbDir = new File(stateDir, "rocksdb-test"); + rocksdbDir.mkdir(); + + + final String kafka = "localhost:9092"; + final String zookeeper = "localhost:2181"; + + SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, zookeeper); + + // producer performance + benchmark.produce(); + // consumer performance + benchmark.consume(); + // simple stream performance source->process + benchmark.processStream(); + // simple stream performance source->sink + benchmark.processStreamWithSink(); + // simple stream performance source->store + benchmark.processStreamWithStateStore(); + } + + public void processStream() { + CountDownLatch latch = new CountDownLatch(1); + + final KafkaStreams streams = createKafkaStreams(stateDir, kafka, zookeeper, latch); + + Thread thread = new Thread() { + public void run() { + streams.start(); + } + }; + thread.start(); + + long startTime = System.currentTimeMillis(); + + while (latch.getCount() > 0) { + try { + latch.await(); + } catch (InterruptedException ex) { + Thread.interrupted(); + } + } + + long endTime = System.currentTimeMillis(); + + System.out.println("Streams Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime)); + + streams.close(); + try { + thread.join(); + } catch (Exception ex) { + // ignore + } + } + + public void processStreamWithSink() { + CountDownLatch latch = new CountDownLatch(1); + + final KafkaStreams streams = createKafkaStreamsWithSink(stateDir, kafka, zookeeper, latch); + + Thread thread = new Thread() { + public void run() { + streams.start(); + } + }; + thread.start(); + + long startTime = System.currentTimeMillis(); + + while (latch.getCount() > 0) { + try { + latch.await(); + } catch (InterruptedException ex) { + Thread.interrupted(); + } + } + + long endTime = System.currentTimeMillis(); + + System.out.println("Streams Performance [MB/sec read+write]: " + megaBytePerSec(endTime - startTime)); + + streams.close(); + try { + thread.join(); + } catch (Exception ex) { + // ignore + } + } + + public void processStreamWithStateStore() { + CountDownLatch latch = new CountDownLatch(1); + + final KafkaStreams streams = createKafkaStreamsWithStateStore(stateDir, kafka, zookeeper, latch); + + Thread thread = new Thread() { + public void run() { + streams.start(); + } + }; + thread.start(); + + long startTime = System.currentTimeMillis(); + + while (latch.getCount() > 0) { + try { + latch.await(); + } catch (InterruptedException ex) { + Thread.interrupted(); + } + } + + long endTime = System.currentTimeMillis(); + + System.out.println("Streams Performance [MB/sec read+store]: " + megaBytePerSec(endTime - startTime)); + + streams.close(); + try { + thread.join(); + } catch (Exception ex) { + // ignore + } + } + + public void produce() { + Properties props = new Properties(); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "simple-benchmark-produce"); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + + KafkaProducer<Long, byte[]> producer = new KafkaProducer<>(props); + + byte[] value = new byte[VALUE_SIZE]; + long startTime = System.currentTimeMillis(); + + for (int i = 0; i < NUM_RECORDS; i++) { + producer.send(new ProducerRecord<>(SOURCE_TOPIC, (long) i, value)); + } + producer.close(); + + long endTime = System.currentTimeMillis(); + + System.out.println("Producer Performance [MB/sec write]: " + megaBytePerSec(endTime - startTime)); + } + + public void consume() { + Properties props = new Properties(); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple-benchmark-consumer"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + + KafkaConsumer<Long, byte[]> consumer = new KafkaConsumer<>(props); + + List<TopicPartition> partitions = getAllPartitions(consumer, SOURCE_TOPIC); + consumer.assign(partitions); + consumer.seekToBeginning(partitions); + + Long key = null; + + long startTime = System.currentTimeMillis(); + + while (true) { + ConsumerRecords<Long, byte[]> records = consumer.poll(500); + if (records.isEmpty()) { + if (END_KEY.equals(key)) + break; + } else { + for (ConsumerRecord<Long, byte[]> record : records) { + key = record.key(); + } + } + } + + long endTime = System.currentTimeMillis(); + + System.out.println("Consumer Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime)); + } + + private KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper, final CountDownLatch latch) { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams"); + props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + KStream<Long, byte[]> source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC); + + source.process(new ProcessorSupplier<Long, byte[]>() { + @Override + public Processor<Long, byte[]> get() { + return new Processor<Long, byte[]>() { + + @Override + public void init(ProcessorContext context) { + } + + @Override + public void process(Long key, byte[] value) { + if (END_KEY.equals(key)) { + latch.countDown(); + } + } + + @Override + public void punctuate(long timestamp) { + } + + @Override + public void close() { + } + }; + } + }); + + return new KafkaStreams(builder, props); + } + + private KafkaStreams createKafkaStreamsWithSink(File stateDir, String kafka, String zookeeper, final CountDownLatch latch) { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-sink"); + props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + KStream<Long, byte[]> source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC); + + source.to(LONG_SERDE, BYTE_SERDE, SINK_TOPIC); + source.process(new ProcessorSupplier<Long, byte[]>() { + @Override + public Processor<Long, byte[]> get() { + return new Processor<Long, byte[]>() { + + @Override + public void init(ProcessorContext context) { + } + + @Override + public void process(Long key, byte[] value) { + if (END_KEY.equals(key)) { + latch.countDown(); + } + } + + @Override + public void punctuate(long timestamp) { + } + + @Override + public void close() { + } + }; + } + }); + + return new KafkaStreams(builder, props); + } + + + private KafkaStreams createKafkaStreamsWithStateStore(File stateDir, String kafka, String zookeeper, final CountDownLatch latch) { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-store"); + props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + builder.addStateStore(Stores.create("store").withLongKeys().withByteArrayValues().persistent().build()); + + KStream<Long, byte[]> source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC); + + source.process(new ProcessorSupplier<Long, byte[]>() { + @Override + public Processor<Long, byte[]> get() { + return new Processor<Long, byte[]>() { + + KeyValueStore<Long, byte[]> store; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + store = (KeyValueStore<Long, byte[]>) context.getStateStore("store"); + } + + @Override + public void process(Long key, byte[] value) { + store.put(key, value); + + if (END_KEY.equals(key)) { + latch.countDown(); + } + } + + @Override + public void punctuate(long timestamp) { + } + + @Override + public void close() { + } + }; + } + }, "store"); + + return new KafkaStreams(builder, props); + } + + private double megaBytePerSec(long time) { + return (double) (RECORD_SIZE * NUM_RECORDS / 1024 / 1024) / ((double) time / 1000); + } + + private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String... topics) { + ArrayList<TopicPartition> partitions = new ArrayList<>(); + + for (String topic : topics) { + for (PartitionInfo info : consumer.partitionsFor(topic)) { + partitions.add(new TopicPartition(info.topic(), info.partition())); + } + } + return partitions; + } + +}
