Repository: kafka Updated Branches: refs/heads/trunk cae5977ed -> 55a90938a
MINOR: add Yahoo benchmark to nightly runs Author: Eno Thereska <[email protected]> Reviewers: Damian Guy <[email protected]> Closes #3289 from enothereska/yahoo-benchmark Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/55a90938 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/55a90938 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/55a90938 Branch: refs/heads/trunk Commit: 55a90938a12d8928289a30588bbad6c959c48674 Parents: cae5977 Author: Eno Thereska <[email protected]> Authored: Wed Jun 21 11:46:59 2017 +0100 Committer: Damian Guy <[email protected]> Committed: Wed Jun 21 11:46:59 2017 +0100 ---------------------------------------------------------------------- checkstyle/import-control.xml | 4 + .../kafka/streams/perf/SimpleBenchmark.java | 92 +++-- .../kafka/streams/perf/YahooBenchmark.java | 361 +++++++++++++++++++ .../streams/streams_simple_benchmark_test.py | 14 +- .../services/performance/streams_performance.py | 5 +- tests/kafkatest/services/streams.py | 7 +- 6 files changed, 438 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index c0c92af..1579a1c 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -186,6 +186,10 @@ <allow pkg="com.fasterxml.jackson.databind" /> <allow pkg="org.apache.kafka.connect.json" /> </subpackage> + + <subpackage name="perf"> + <allow pkg="com.fasterxml.jackson.databind" /> + </subpackage> <subpackage name="integration"> <allow pkg="kafka.admin" /> http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index 2983df5..ec3ea0e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -55,6 +55,7 @@ import java.util.concurrent.CountDownLatch; import java.util.Properties; import java.util.Random; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * Class that provides support for a series of benchmarks. It is usually driven by @@ -73,17 +74,20 @@ import java.util.concurrent.TimeUnit; */ public class SimpleBenchmark { - private final String kafka; + final String kafka; private final File stateDir; - private final Boolean loadPhase; - private final String testName; - private static final String ALL_TESTS = "all"; + final Boolean loadPhase; + final String testName; + final int numThreads; + static final String ALL_TESTS = "all"; private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic"; private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic"; private static final String COUNT_TOPIC = "countTopic"; private static final String JOIN_TOPIC_1_PREFIX = "joinSourceTopic1"; private static final String JOIN_TOPIC_2_PREFIX = "joinSourceTopic2"; + private static final String YAHOO_CAMPAIGNS_TOPIC = "yahooCampaigns"; + private static final String YAHOO_EVENTS_TOPIC = "yahooEvents"; private static final ValueJoiner VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>() { @Override public byte[] apply(final byte[] value1, final byte[] value2) { @@ -101,9 +105,9 @@ public class SimpleBenchmark { } }; - private static int numRecords; - private static int processedRecords = 0; - private static long processedBytes = 0; + int numRecords; + final AtomicInteger processedRecords = new AtomicInteger(0); + long processedBytes = 0; private static final int VALUE_SIZE = 100; private static final long POLL_MS = 500L; private static final long COMMIT_INTERVAL_MS = 30000L; @@ -113,12 +117,15 @@ public class SimpleBenchmark { private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray(); private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer(); - public SimpleBenchmark(final File stateDir, final String kafka, final Boolean loadPhase, final String testName) { + public SimpleBenchmark(final File stateDir, final String kafka, final Boolean loadPhase, + final String testName, final int numRecords, final int numThreads) { super(); this.stateDir = stateDir; this.kafka = kafka; this.loadPhase = loadPhase; this.testName = testName; + this.numRecords = numRecords; + this.numThreads = numThreads; } private void run() throws Exception { @@ -175,6 +182,9 @@ public class SimpleBenchmark { case "ktablektablejoin": kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable"); break; + case "yahoo": + yahooBenchmark(YAHOO_CAMPAIGNS_TOPIC, YAHOO_EVENTS_TOPIC); + break; default: throw new Exception("Unknown test name " + testName); @@ -184,14 +194,13 @@ public class SimpleBenchmark { public static void main(String[] args) throws Exception { String kafka = args.length > 0 ? args[0] : "localhost:9092"; String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath(); - numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000; + int numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000; boolean loadPhase = args.length > 3 ? Boolean.parseBoolean(args[3]) : false; String testName = args.length > 4 ? args[4].toLowerCase(Locale.ROOT) : ALL_TESTS; + int numThreads = args.length > 5 ? Integer.parseInt(args[5]) : 1; final File stateDir = new File(stateDirStr); stateDir.mkdir(); - final File rocksdbDir = new File(stateDir, "rocksdb-test"); - rocksdbDir.mkdir(); // Note: this output is needed for automated tests and must not be removed System.out.println("StreamsTest instance started"); @@ -200,17 +209,18 @@ public class SimpleBenchmark { System.out.println("numRecords=" + numRecords); System.out.println("loadPhase=" + loadPhase); System.out.println("testName=" + testName); + System.out.println("numThreads=" + numThreads); - SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, loadPhase, testName); + SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, loadPhase, testName, numRecords, numThreads); benchmark.run(); } - private Properties setStreamProperties(final String applicationId) { + public Properties setStreamProperties(final String applicationId) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // the socket buffer needs to be large, especially when running in AWS with // high latency. if running locally the default is fine. @@ -244,8 +254,7 @@ public class SimpleBenchmark { private boolean maybeSetupPhase(final String topic, final String clientId, final boolean skipIfAllTests) throws Exception { - processedRecords = 0; - processedBytes = 0; + resetStats(); // initialize topics if (loadPhase) { if (skipIfAllTests) { @@ -264,6 +273,12 @@ public class SimpleBenchmark { return false; } + void resetStats() { + processedRecords.set(0); + processedBytes = 0; + } + + private KafkaStreams createCountStreams(Properties streamConfig, String topic, final CountDownLatch latch) { final KStreamBuilder builder = new KStreamBuilder(); final KStream<Integer, byte[]> input = builder.stream(topic); @@ -274,6 +289,13 @@ public class SimpleBenchmark { return new KafkaStreams(builder, streamConfig); } + + private void yahooBenchmark(final String campaignsTopic, final String eventsTopic) throws Exception { + YahooBenchmark benchmark = new YahooBenchmark(this, campaignsTopic, eventsTopic); + + benchmark.run(); + } + /** * Measure the performance of a simple aggregate like count. * Counts the occurrence of numbers (note that normally people count words, this @@ -351,15 +373,15 @@ public class SimpleBenchmark { runGenericBenchmark(streams, "Streams KTableKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch); } - private void printResults(final String nameOfBenchmark, final long latency) { + void printResults(final String nameOfBenchmark, final long latency) { System.out.println(nameOfBenchmark + - processedRecords + "/" + + processedRecords.get() + "/" + latency + "/" + - recordsPerSec(latency, processedRecords) + "/" + + recordsPerSec(latency, processedRecords.get()) + "/" + megabytesPerSec(latency, processedBytes)); } - private void runGenericBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) { + void runGenericBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) { streams.start(); long startTime = System.currentTimeMillis(); @@ -459,6 +481,7 @@ public class SimpleBenchmark { public void produce(String topic) throws Exception { // loading phase does not make sense for producer if (loadPhase) { + resetStats(); return; } produce(topic, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true); @@ -477,8 +500,7 @@ public class SimpleBenchmark { private void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential, int upperRange, boolean printStats) throws Exception { - processedRecords = 0; - processedBytes = 0; + if (sequential) { if (upperRange < numRecords) throw new Exception("UpperRange must be >= numRecords"); } @@ -504,7 +526,7 @@ public class SimpleBenchmark { producer.send(new ProducerRecord<>(topic, key, value)); if (sequential) key++; else key = rand.nextInt(upperRange); - processedRecords++; + processedRecords.getAndIncrement(); processedBytes += value.length + Integer.SIZE; } producer.close(); @@ -536,20 +558,20 @@ public class SimpleBenchmark { while (true) { ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS); if (records.isEmpty()) { - if (processedRecords == numRecords) + if (processedRecords.get() == numRecords) break; } else { for (ConsumerRecord<Integer, byte[]> record : records) { - processedRecords++; + processedRecords.getAndIncrement(); processedBytes += record.value().length + Integer.SIZE; Integer recKey = record.key(); if (key == null || key < recKey) key = recKey; - if (processedRecords == numRecords) + if (processedRecords.get() == numRecords) break; } } - if (processedRecords == numRecords) + if (processedRecords.get() == numRecords) break; } @@ -577,9 +599,9 @@ public class SimpleBenchmark { @Override public void process(Integer key, byte[] value) { - processedRecords++; + processedRecords.getAndIncrement(); processedBytes += value.length + Integer.SIZE; - if (processedRecords == numRecords) { + if (processedRecords.get() == numRecords) { latch.countDown(); } } @@ -616,9 +638,9 @@ public class SimpleBenchmark { @Override public void process(Integer key, byte[] value) { - processedRecords++; + processedRecords.getAndIncrement(); processedBytes += value.length + Integer.SIZE; - if (processedRecords == numRecords) { + if (processedRecords.get() == numRecords) { latch.countDown(); } } @@ -644,7 +666,7 @@ public class SimpleBenchmark { } @Override public void apply(Integer key, V value) { - processedRecords++; + processedRecords.getAndIncrement(); if (value instanceof byte[]) { processedBytes += ((byte[]) value).length + Integer.SIZE; } else if (value instanceof Long) { @@ -652,7 +674,7 @@ public class SimpleBenchmark { } else { System.err.println("Unknown value type in CountDownAction"); } - if (processedRecords == numRecords) { + if (processedRecords.get() == numRecords) { this.latch.countDown(); } } @@ -724,9 +746,9 @@ public class SimpleBenchmark { @Override public void process(Integer key, byte[] value) { store.put(key, value); - processedRecords++; + processedRecords.getAndIncrement(); processedBytes += value.length + Integer.SIZE; - if (processedRecords == numRecords) { + if (processedRecords.get() == numRecords) { latch.countDown(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java new file mode 100644 index 0000000..56c578f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.perf; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.kstream.ValueMapper; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; +import java.util.List; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; + + +/** + * A basic DSL and data generation that emulates the behavior of the Yahoo Benchmark + * https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at + * Thanks to Michael Armbrust for providing the initial code for this benchmark in his blog: + * https://databricks.com/blog/2017/06/06/simple-super-fast-streaming-engine-apache-spark.html + */ +public class YahooBenchmark { + private final SimpleBenchmark parent; + private final String campaignsTopic; + private final String eventsTopic; + + static class ProjectedEvent { + /* attributes need to be public for serializer to work */ + /* main attributes */ + public String eventType; + public String adID; + + /* other attributes */ + public long eventTime; + public String userID = UUID.randomUUID().toString(); // not used + public String pageID = UUID.randomUUID().toString(); // not used + public String addType = "banner78"; // not used + public String ipAddress = "1.2.3.4"; // not used + } + + static class CampaignAd { + /* attributes need to be public for serializer to work */ + public String adID; + public String campaignID; + } + + public YahooBenchmark(final SimpleBenchmark parent, final String campaignsTopic, final String eventsTopic) { + this.parent = parent; + this.campaignsTopic = campaignsTopic; + this.eventsTopic = eventsTopic; + } + + // just for Yahoo benchmark + private boolean maybeSetupPhaseCampaigns(final String topic, final String clientId, + final boolean skipIfAllTests, + final int numCampaigns, final int adsPerCampaign, + final List<String> ads) throws Exception { + parent.resetStats(); + // initialize topics + if (parent.loadPhase) { + if (skipIfAllTests) { + // if we run all tests, the produce test will have already loaded the data + if (parent.testName.equals(SimpleBenchmark.ALL_TESTS)) { + // Skipping loading phase since previously loaded + return true; + } + } + System.out.println("Initializing topic " + topic); + + Properties props = new Properties(); + props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.kafka); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + KafkaProducer<String, String> producer = new KafkaProducer<>(props); + for (int c = 0; c < numCampaigns; c++) { + String campaignID = UUID.randomUUID().toString(); + for (int a = 0; a < adsPerCampaign; a++) { + String adId = UUID.randomUUID().toString(); + String concat = adId + ":" + campaignID; + producer.send(new ProducerRecord<>(topic, adId, concat)); + ads.add(adId); + parent.processedRecords.getAndIncrement(); + parent.processedBytes += concat.length() + adId.length(); + } + } + return true; + } + return false; + } + + // just for Yahoo benchmark + private boolean maybeSetupPhaseEvents(final String topic, final String clientId, + final boolean skipIfAllTests, final int numRecords, + final List<String> ads) throws Exception { + parent.resetStats(); + String[] eventTypes = new String[]{"view", "click", "purchase"}; + Random rand = new Random(); + // initialize topics + if (parent.loadPhase) { + if (skipIfAllTests) { + // if we run all tests, the produce test will have already loaded the data + if (parent.testName.equals(SimpleBenchmark.ALL_TESTS)) { + // Skipping loading phase since previously loaded + return true; + } + } + System.out.println("Initializing topic " + topic); + + Properties props = new Properties(); + props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.kafka); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + + KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props); + + long startTime = System.currentTimeMillis(); + + ProjectedEvent event = new ProjectedEvent(); + + Map<String, Object> serdeProps = new HashMap<>(); + final Serializer<ProjectedEvent> projectedEventSerializer = new JsonPOJOSerializer<>(); + serdeProps.put("JsonPOJOClass", ProjectedEvent.class); + projectedEventSerializer.configure(serdeProps, false); + + for (int i = 0; i < numRecords; i++) { + event.eventType = eventTypes[rand.nextInt(eventTypes.length - 1)]; + event.adID = ads.get(rand.nextInt(ads.size() - 1)); + event.eventTime = System.currentTimeMillis(); + byte[] value = projectedEventSerializer.serialize(topic, event); + producer.send(new ProducerRecord<>(topic, event.adID, value)); + parent.processedRecords.getAndIncrement(); + parent.processedBytes += value.length + event.adID.length(); + } + producer.close(); + + long endTime = System.currentTimeMillis(); + + + parent.printResults("Producer Performance [records/latency/rec-sec/MB-sec write]: ", endTime - startTime); + return true; + } + return false; + } + + + public void run() throws Exception { + int numCampaigns = 100; + int adsPerCampaign = 10; + + List<String> ads = new ArrayList<>(numCampaigns * adsPerCampaign); + if (maybeSetupPhaseCampaigns(campaignsTopic, "simple-benchmark-produce-campaigns", false, + numCampaigns, adsPerCampaign, ads)) { + maybeSetupPhaseEvents(eventsTopic, "simple-benchmark-produce-events", false, + parent.numRecords, ads); + return; + } + + CountDownLatch latch = new CountDownLatch(1); + Properties props = parent.setStreamProperties("simple-benchmark-yahoo" + new Random().nextInt()); + + final KafkaStreams streams = createYahooBenchmarkStreams(props, campaignsTopic, eventsTopic, latch, parent.numRecords); + parent.runGenericBenchmark(streams, "Streams Yahoo Performance [records/latency/rec-sec/MB-sec counted]: ", latch); + + } + // Note: these are also in the streams example package, eventually use 1 file + private class JsonPOJOSerializer<T> implements Serializer<T> { + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Default constructor needed by Kafka + */ + public JsonPOJOSerializer() { + } + + @Override + public void configure(Map<String, ?> props, boolean isKey) { + } + + @Override + public byte[] serialize(String topic, T data) { + if (data == null) + return null; + + try { + return objectMapper.writeValueAsBytes(data); + } catch (Exception e) { + throw new SerializationException("Error serializing JSON message", e); + } + } + + @Override + public void close() { + } + + } + + // Note: these are also in the streams example package, eventuall use 1 file + private class JsonPOJODeserializer<T> implements Deserializer<T> { + private ObjectMapper objectMapper = new ObjectMapper(); + + private Class<T> tClass; + + /** + * Default constructor needed by Kafka + */ + public JsonPOJODeserializer() { + } + + @SuppressWarnings("unchecked") + @Override + public void configure(Map<String, ?> props, boolean isKey) { + tClass = (Class<T>) props.get("JsonPOJOClass"); + } + + @Override + public T deserialize(String topic, byte[] bytes) { + if (bytes == null) + return null; + + T data; + try { + data = objectMapper.readValue(bytes, tClass); + } catch (Exception e) { + throw new SerializationException(e); + } + + return data; + } + + @Override + public void close() { + + } + } + + + private KafkaStreams createYahooBenchmarkStreams(final Properties streamConfig, final String campaignsTopic, final String eventsTopic, + final CountDownLatch latch, final int numRecords) { + Map<String, Object> serdeProps = new HashMap<>(); + final Serializer<ProjectedEvent> projectedEventSerializer = new JsonPOJOSerializer<>(); + serdeProps.put("JsonPOJOClass", ProjectedEvent.class); + projectedEventSerializer.configure(serdeProps, false); + final Deserializer<ProjectedEvent> projectedEventDeserializer = new JsonPOJODeserializer<>(); + serdeProps.put("JsonPOJOClass", ProjectedEvent.class); + projectedEventDeserializer.configure(serdeProps, false); + + final KStreamBuilder builder = new KStreamBuilder(); + final KStream<String, ProjectedEvent> kEvents = builder.stream(Serdes.String(), + Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer), eventsTopic); + final KTable<String, String> kCampaigns = builder.table(Serdes.String(), Serdes.String(), + campaignsTopic, "campaign-state"); + + + KStream<String, ProjectedEvent> filteredEvents = kEvents + // use peek to quick when last element is processed + .peek(new ForeachAction<String, ProjectedEvent>() { + @Override + public void apply(String key, ProjectedEvent value) { + parent.processedRecords.getAndIncrement(); + if (parent.processedRecords.get() % 1000000 == 0) { + System.out.println("Processed " + parent.processedRecords.get()); + } + if (parent.processedRecords.get() >= numRecords) { + latch.countDown(); + } + } + }) + // only keep "view" events + .filter(new Predicate<String, ProjectedEvent>() { + @Override + public boolean test(final String key, final ProjectedEvent value) { + return value.eventType.equals("view"); + } + }) + // select just a few of the columns + .mapValues(new ValueMapper<ProjectedEvent, ProjectedEvent>() { + @Override + public ProjectedEvent apply(ProjectedEvent value) { + ProjectedEvent event = new ProjectedEvent(); + event.adID = value.adID; + event.eventTime = value.eventTime; + event.eventType = value.eventType; + return event; + } + }); + + // deserialize the add ID and campaign ID from the stored value in Kafka + KTable<String, CampaignAd> deserCampaigns = kCampaigns.mapValues(new ValueMapper<String, CampaignAd>() { + @Override + public CampaignAd apply(String value) { + String[] parts = value.split(":"); + CampaignAd cAdd = new CampaignAd(); + cAdd.adID = parts[0]; + cAdd.campaignID = parts[1]; + return cAdd; + } + }); + + // join the events with the campaigns + KStream<String, String> joined = filteredEvents.join(deserCampaigns, + new ValueJoiner<ProjectedEvent, CampaignAd, String>() { + @Override + public String apply(ProjectedEvent value1, CampaignAd value2) { + return value2.campaignID; + } + }, Serdes.String(), Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer)); + + + // key by campaign rather than by ad as original + KStream<String, String> keyedByCampaign = joined + .selectKey(new KeyValueMapper<String, String, String>() { + @Override + public String apply(String key, String value) { + return value; + } + }); + + // calculate windowed counts + keyedByCampaign + .groupByKey(Serdes.String(), Serdes.String()) + .count(TimeWindows.of(10 * 1000), "time-windows"); + + return new KafkaStreams(builder, streamConfig); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py index 6df3cab..258b7c0 100644 --- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py +++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py @@ -32,10 +32,10 @@ class StreamsSimpleBenchmarkTest(Test): super(StreamsSimpleBenchmarkTest, self).__init__(test_context) self.num_records = 20000000L self.replication = 1 - + self.num_threads = 1 @cluster(num_nodes=9) - @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 3]) + @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin", "yahoo"], scale=[1, 3]) def test_simple_benchmark(self, test, scale): """ Run simple Kafka Streams benchmark @@ -58,7 +58,9 @@ class StreamsSimpleBenchmarkTest(Test): 'joinSourceTopic1KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication }, 'joinSourceTopic2KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication }, 'joinSourceTopic1KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication }, - 'joinSourceTopic2KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication } + 'joinSourceTopic2KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication }, + 'yahooCampaigns' : { 'partitions': 20, 'replication-factor': self.replication }, + 'yahooEvents' : { 'partitions': 20, 'replication-factor': self.replication } }) self.kafka.log_level = "INFO" self.kafka.start() @@ -67,7 +69,8 @@ class StreamsSimpleBenchmarkTest(Test): # LOAD PHASE ################ self.load_driver = StreamsSimpleBenchmarkService(self.test_context, self.kafka, - self.num_records * scale, "true", test) + self.num_records * scale, "true", test, + self.num_threads) self.load_driver.start() self.load_driver.wait() self.load_driver.stop() @@ -77,7 +80,8 @@ class StreamsSimpleBenchmarkTest(Test): ################ for num in range(0, scale): self.driver[num] = StreamsSimpleBenchmarkService(self.test_context, self.kafka, - self.num_records/(scale), "false", test) + self.num_records/(scale), "false", test, + self.num_threads) self.driver[num].start() ####################### http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/tests/kafkatest/services/performance/streams_performance.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py index 8cedb51..94f7249 100644 --- a/tests/kafkatest/services/performance/streams_performance.py +++ b/tests/kafkatest/services/performance/streams_performance.py @@ -22,13 +22,14 @@ from kafkatest.services.streams import StreamsTestBaseService class StreamsSimpleBenchmarkService(StreamsTestBaseService): """Base class for simple Kafka Streams benchmark""" - def __init__(self, test_context, kafka, numrecs, load_phase, test_name): + def __init__(self, test_context, kafka, numrecs, load_phase, test_name, num_threads): super(StreamsSimpleBenchmarkService, self).__init__(test_context, kafka, "org.apache.kafka.streams.perf.SimpleBenchmark", numrecs, load_phase, - test_name) + test_name, + num_threads) def collect_data(self, node, tag = None): # Collect the data and return it to the framework http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/tests/kafkatest/services/streams.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 867e3f5..f6b6dd4 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -45,13 +45,14 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): "collect_default": True}, } - def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None): + def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None, user_test_args3=None): super(StreamsTestBaseService, self).__init__(test_context, 1) self.kafka = kafka self.args = {'streams_class_name': streams_class_name, 'user_test_args': user_test_args, 'user_test_args1': user_test_args1, - 'user_test_args2': user_test_args2} + 'user_test_args2': user_test_args2, + 'user_test_args3': user_test_args3} self.log_level = "DEBUG" @property @@ -122,7 +123,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \ " %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \ - " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args + " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args return cmd
