METRON-1483: Create a tool to monitor performance of the topologies closes apache/incubator-metron#958
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/46ad9d93 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/46ad9d93 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/46ad9d93 Branch: refs/heads/master Commit: 46ad9d93b4385da0f8668f2ba84212d54d00ba4b Parents: e3eeec3 Author: cstella <ceste...@gmail.com> Authored: Tue Mar 20 09:36:32 2018 -0400 Committer: cstella <ceste...@gmail.com> Committed: Tue Mar 20 09:36:32 2018 -0400 ---------------------------------------------------------------------- metron-contrib/metron-performance/README.md | 205 ++++++++ .../performance_measurement.png | Bin 0 -> 5790 bytes metron-contrib/metron-performance/pom.xml | 134 +++++ .../src/main/assembly/assembly.xml | 42 ++ .../metron/performance/load/LoadGenerator.java | 175 +++++++ .../metron/performance/load/LoadOptions.java | 499 +++++++++++++++++++ .../performance/load/MessageGenerator.java | 48 ++ .../metron/performance/load/SendToKafka.java | 107 ++++ .../load/monitor/AbstractMonitor.java | 49 ++ .../load/monitor/EPSGeneratedMonitor.java | 53 ++ .../monitor/EPSThroughputWrittenMonitor.java | 77 +++ .../performance/load/monitor/MonitorNaming.java | 23 + .../performance/load/monitor/MonitorTask.java | 44 ++ .../performance/load/monitor/Results.java | 51 ++ .../load/monitor/writers/CSVWriter.java | 67 +++ .../load/monitor/writers/ConsoleWriter.java | 65 +++ .../load/monitor/writers/Writable.java | 40 ++ .../load/monitor/writers/Writer.java | 86 ++++ .../performance/sampler/BiasedSampler.java | 113 +++++ .../metron/performance/sampler/Sampler.java | 24 + .../performance/sampler/UnbiasedSampler.java | 28 ++ .../metron/performance/util/KafkaUtil.java | 56 +++ .../src/main/scripts/load_tool.sh | 36 ++ .../performance/load/LoadOptionsTest.java | 93 ++++ .../performance/load/SendToKafkaTest.java | 49 ++ .../metron/performance/sampler/SamplerTest.java | 145 ++++++ metron-contrib/pom.xml | 15 + .../common-services/METRON/CURRENT/metainfo.xml | 4 + .../packaging/docker/deb-docker/pom.xml | 6 + .../docker/rpm-docker/SPECS/metron.spec | 21 + .../packaging/docker/rpm-docker/pom.xml | 6 + 31 files changed, 2361 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/README.md ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/README.md b/metron-contrib/metron-performance/README.md new file mode 100644 index 0000000..8981349 --- /dev/null +++ b/metron-contrib/metron-performance/README.md @@ -0,0 +1,205 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +# Performance Utilities + +This project creates some useful performance monitoring and measurement +utilities. + +## `load-tool.sh` + +The Load tool is intended to do the following: +* Generate a load at a specific events per second into kafka + * The messages are taken from a template file, where there is a message template per line + * The load can be biased (e.g. 80% of the load can be comprised of 20% of the templates) +* Monitor the kafka offsets for a topic to determine the events per second written + * This could be the topic that you are generating load on + * This could be another topic that represents the output of some topology (e.g. generate load on `enrichments` and monitor `indexing` to determine the throughput of the enrichment topology). + +``` +usage: Generator + -bs,--sample_bias <BIAS_FILE> The discrete distribution to bias + the sampling. This is a CSV of 2 + columns. The first column is the % + of the templates and the 2nd column + is the probability (0-100) that + it's chosen. For instance: + 20,80 + 80,20 + implies that 20% of the templates + will comprise 80% of the output and + the remaining 80% of the templates + will comprise 20% of the output. + -c,--csv <CSV_FILE> A CSV file to emit monitoring data + to. The format is a CSV with the + following schema: timestamp, (name, + eps, historical_mean, + historical_stddev)+ + -cg,--consumer_group <GROUP_ID> Consumer Group. The default is + load.group + -e,--eps <EPS> The target events per second + -h,--help Generate Help screen + -k,--kafka_config <CONFIG_FILE> The kafka config. This is a file + containing a JSON map with the + kafka config. + -l,--lookback <LOOKBACK> When summarizing, how many + monitoring periods should we + summarize over? If 0, then no + summary. Default: 5 + -md,--monitor_delta_ms <TIME_IN_MS> The time (in ms) between monitoring + output. Default is 10000 + -mt,--monitor_topic <TOPIC> The kafka topic to monitor. + -ot,--output_topic <TOPIC> The kafka topic to write to + -p,--threads <NUM_THREADS> The number of threads to use when + extracting data. The default is + the number of cores of your + machine. + -sd,--send_delta_ms <TIME_IN_MS> The time (in ms) between sending a + batch of messages. Default is 100 + -t,--template <TEMPLATE_FILE> The template file to use for + generation. This should be a file + with a template per line with + $METRON_TS and $METRON_GUID in the + spots for timestamp and guid, if + you so desire them. + -tl,--time_limit_ms <MS> The total amount of time to run + this in milliseconds. By default, + it never stops. + -z,--zk_quorum <QUORUM> zookeeper quorum +``` + +## Templates +Messages are drawn from a template file. A template file has a message template per line. +For instance, let's say we want to generate JSON maps with fields: `source.type`, `ip_src_addr` +and `ip_dst_addr`. We can generate a template file with a template like the following per line: +``` +{ "source.type" : "asa", "ip_src_addr" : "127.0.0.1", "ip_dst_addr" : "191.168.1.1" } +``` + +When messages are generated, there are some special replacements that can be used: `$METRON_TS` and `$METRON_GUID`. +We can adjust our previous template to use these like so: +``` +{ "source.type" : "asa", "ip_src_addr" : "127.0.0.1", "ip_dst_addr" : "191.168.1.1", "timestamp" : $METRON_TS, "guid" : "$METRON_GUID" } +``` +One note about GUIDs generated. We do not generate global UUIDs, they are unique only within the context of a given generator run. + +## Biased Sampling + +This load tool can be configured to use biased sampling. This is useful if, for instance, you are trying to model data which is not distributed +uniformly, like many types of network data. Generating synthetic data with similar distribution to your regular data will enable the caches +to be exercised in the same way, for instance, and yield a more realistic scenario. + +You specify the biases in a csv file with 2 columns: +* The first column represents the % of the templates +* The second column represents the % of the generated output. + +A simple example would be to generate samples based on Pareto's principle: +``` +20,80 +80,20 +``` +This would yield biases that mean the first 20% of the templates in the template file would comprise 80% of the output. + +A more complex example might be: +``` +20,80 +20,5 +50,1 +10,14 +``` +This would would imply: +* The first 20% of the templates would comprise 80% of the output +* The next 20% of the templates would comprise 5% of the output +* The next 50% of the templates would comprise 1% of the output +* The next 10% of the templates would comprise 14% of the output. + +## CSV Output + +For those who would prefer a different visualization or wish to incorporate the output of this tool into an automated test, +you can specify a file to emit data in CSV format to via the `-c` or `--csv` option. + +The CSV columns are as follows: +* timestamp in epoch millis + +If you are generating synthetic data, then: +* "generated" +* The events per second generated +* The mean of the events per second generated for the the last `k` runs, where `k` is the lookback (set via `-l` and defaulted to `5`) +* The standard deviation of the events per second generated for the last `k` runs, where `k` is the lookback (set via `-l` and defaulted to `5`) + +If you are monitoring a topic, then: +* "throughput measured" +* The events per second measured +* The mean of the events per second measured for the the last `k` runs, where `k` is the lookback (set via `-l` and defaulted to `5`) +* The standard deviation of the events per second measured for the last `k` runs, where `k` is the lookback (set via `-l` and defaulted to `5`) + +Obviously, if you are doing both generating and monitoring the throughput of a topic, then all of the columns are added. + +An example of CSV output is: +``` +1520506955047,generated,,,,throughput measured,,, +1520506964896,generated,1045,1045,0,throughput measured,,, +1520506974896,generated,1000,1022,31,throughput measured,1002,1002,0 +1520506984904,generated,999,1014,26,throughput measured,999,1000,2 +1520506994896,generated,1000,1011,22,throughput measured,1000,1000,1 +1520507004896,generated,1000,1008,20,throughput measured,1000,1000,1 +``` + +## Use-cases for the Load Tool + +### Measure Throughput of a Topology + +One can use the load tool to monitor performance of a kafka-to-kafka topology. +For instance, we could monitor the throughput of the enrichment topology by monitoring the `enrichments` kafka topic: +``` +$METRON_HOME/bin/load_tool.sh -mt enrichments -z $ZOOKEEPER +``` + +### Generate Synthetic Load and Measure Performance + +One can use the load tool to generate synthetic load and monitor performance of a kafka-to-kafka topology. For instance, we could +monitor the performance of the enrichment topology. It is advised to start the enrichment topology against a new topic and write +to a new topic so as to not pollute your downstream indices. So, for instance we could create a kafka topic called +`enrichments_load` by generating load on it. We could also create a new kafka topic called `indexing_load` and configure the enrichment +topology to output to it. We would then generate load on `enrichments_load` and monitor `indexing_load`. +``` +#Threadpool of size 5, you want somewhere between 5 and 10 depending on the throughput numbers you're trying to drive +#Messages drawn from ~/dummy.templates, which is a message template per line +#Generate at a rate of 9000 messages per second +#Emit the data to a CSV file ~/measurements.csv +$METRON_HOME/bin/load_tool.sh -p 5 -ot enrichments_load -mt indexing_load -t ~/dummy.templates -eps 9000 -z $ZOOKEEPER -c ~/measurements.csv +``` + +Now, with the help of a bash function and gnuplot we can generate a plot +of the historical throughput measurements for `indexing_load`: +``` +# Ensure that you have installed gnuplot and the liberation font package +# via yum install -y gnuplot liberation-sans-fonts +# We will define a plot function that will generate a png plot. It takes +# one arg, the output file. It expects to have a 2 column CSV streamed +# with the first dimension being the timestamp and the second dimension +# being what you want plotted. +plot() { + awk -F, '{printf "%d %d\n", $1/1000, $2} END { print "e" }' | gnuplot -e "reset;clear;set style fill solid 1.0 border -1; set nokey;set title 'Throughput Measured'; set xlabel 'Time'; set boxwidth 0.5; set xtics rotate; set ylabel 'events/sec';set xdata time; set timefmt '%s';set format x '%H:%M:%S';set term png enhanced font '/usr/share/fonts/liberation/LiberationSans-Regular.ttf' 12 size 900,400; set output '$1';plot '< cat -' using 1:2 with line lt -1 lw 2;" +} + +# We want to transform the CSV file into a space separated file with the +# timestamp followed by the throughput measurements. +cat ~/measurements.csv | awk -F, '{printf "%d,%d\n", $1, $8 }' | plot performance_measurement.png +``` +This generates a plot like so to `performance_measurement.png`: +![Performance Measurement](performance_measurement.png) http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/performance_measurement.png ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/performance_measurement.png b/metron-contrib/metron-performance/performance_measurement.png new file mode 100644 index 0000000..c4dcfb1 Binary files /dev/null and b/metron-contrib/metron-performance/performance_measurement.png differ http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/pom.xml ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/pom.xml b/metron-contrib/metron-performance/pom.xml new file mode 100644 index 0000000..4242110 --- /dev/null +++ b/metron-contrib/metron-performance/pom.xml @@ -0,0 +1,134 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software + Foundation (ASF) under one or more contributor license agreements. See the + NOTICE file distributed with this work for additional information regarding + copyright ownership. The ASF licenses this file to You under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. + --> + + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <name>metron-performance</name> + <groupId>org.apache.metron</groupId> + <artifactId>metron-performance</artifactId> + <packaging>jar</packaging> + <parent> + <groupId>org.apache.metron</groupId> + <artifactId>metron-contrib</artifactId> + <version>0.4.3</version> + </parent> + <description>Performance Testing Utilities</description> + <url>https://metron.apache.org/</url> + + <dependencies> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${global_guava_version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-common</artifactId> + <version>${project.parent.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>${global_kafka_version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>${global_shade_version}</version> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + <artifactSet> + <excludes> + <exclude>*slf4j*</exclude> + </excludes> + </artifactSet> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <relocations> + <relocation> + <pattern>com.google.common</pattern> + <shadedPattern>org.apache.metron.perf.guava</shadedPattern> + </relocation> + </relocations> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer"> + <resources> + <resource>.yaml</resource> + <resource>LICENSE.txt</resource> + <resource>ASL2.0</resource> + <resource>NOTICE.txt</resource> + </resources> + </transformer> + <!-- UNCOMMENT THIS IF YOU NEED TO REGENERATE THE BEST GUESS NOTICES FILE WHICH REQUIRES PRUNING EVERY RELEASE --> + <!--transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> + <addHeader>false</addHeader> + <projectName>${project.name}</projectName> + </transformer--> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass></mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptor>src/main/assembly/assembly.xml</descriptor> + </configuration> + <executions> + <execution> + <id>make-assembly</id> <!-- this is used for inheritance merges --> + <phase>package</phase> <!-- bind to the packaging phase --> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/assembly/assembly.xml ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/assembly/assembly.xml b/metron-contrib/metron-performance/src/main/assembly/assembly.xml new file mode 100644 index 0000000..3595284 --- /dev/null +++ b/metron-contrib/metron-performance/src/main/assembly/assembly.xml @@ -0,0 +1,42 @@ +<!-- + Licensed to the Apache Software + Foundation (ASF) under one or more contributor license agreements. See the + NOTICE file distributed with this work for additional information regarding + copyright ownership. The ASF licenses this file to You under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. + --> + +<assembly> + <id>archive</id> + <formats> + <format>tar.gz</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${project.basedir}/src/main/scripts</directory> + <outputDirectory>bin</outputDirectory> + <useDefaultExcludes>true</useDefaultExcludes> + <excludes> + <exclude>**/*.formatted</exclude> + <exclude>**/*.filtered</exclude> + </excludes> + <fileMode>0755</fileMode> + <lineEnding>unix</lineEnding> + <filtered>true</filtered> + </fileSet> + <fileSet> + <directory>${project.basedir}/target</directory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + <outputDirectory>lib</outputDirectory> + <useDefaultExcludes>true</useDefaultExcludes> + </fileSet> + </fileSets> +</assembly> http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java new file mode 100644 index 0000000..33f777b --- /dev/null +++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load; + + +import com.google.common.base.Joiner; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.PosixParser; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.metron.common.utils.KafkaUtils; +import org.apache.metron.performance.load.monitor.AbstractMonitor; +import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor; +import org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor; +import org.apache.metron.performance.load.monitor.MonitorTask; +import org.apache.metron.performance.load.monitor.writers.CSVWriter; +import org.apache.metron.performance.load.monitor.writers.ConsoleWriter; +import org.apache.metron.performance.load.monitor.writers.Writable; +import org.apache.metron.performance.load.monitor.writers.Writer; +import org.apache.metron.performance.sampler.BiasedSampler; +import org.apache.metron.performance.sampler.Sampler; +import org.apache.metron.performance.sampler.UnbiasedSampler; + +import java.io.File; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +public class LoadGenerator +{ + public static String CONSUMER_GROUP = "metron.load.group"; + public static long SEND_PERIOD_MS = 100; + public static long MONITOR_PERIOD_MS = 1000*10; + private static ExecutorService pool; + private static ThreadLocal<KafkaProducer<String, String>> kafkaProducer; + public static AtomicLong numSent = new AtomicLong(0); + + public static void main( String[] args ) throws Exception { + CommandLine cli = LoadOptions.parse(new PosixParser(), args); + EnumMap<LoadOptions, Optional<Object>> evaluatedArgs = LoadOptions.createConfig(cli); + Map<String, Object> kafkaConfig = new HashMap<>(); + kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + kafkaConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + kafkaConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + if(LoadOptions.ZK.has(cli)) { + String zkQuorum = (String) evaluatedArgs.get(LoadOptions.ZK).get(); + kafkaConfig.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + , Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum)) + ); + } + String groupId = evaluatedArgs.get(LoadOptions.CONSUMER_GROUP).get().toString(); + System.out.println("Consumer Group: " + groupId); + kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + if(LoadOptions.KAFKA_CONFIG.has(cli)) { + kafkaConfig.putAll((Map<String, Object>) evaluatedArgs.get(LoadOptions.KAFKA_CONFIG).get()); + } + kafkaProducer = ThreadLocal.withInitial(() -> new KafkaProducer<>(kafkaConfig)); + int numThreads = (int)evaluatedArgs.get(LoadOptions.NUM_THREADS).get(); + System.out.println("Thread pool size: " + numThreads); + pool = Executors.newFixedThreadPool(numThreads); + Optional<Object> eps = evaluatedArgs.get(LoadOptions.EPS); + + Optional<Object> outputTopic = evaluatedArgs.get(LoadOptions.OUTPUT_TOPIC); + Optional<Object> monitorTopic = evaluatedArgs.get(LoadOptions.MONITOR_TOPIC); + long sendDelta = (long) evaluatedArgs.get(LoadOptions.SEND_DELTA).get(); + long monitorDelta = (long) evaluatedArgs.get(LoadOptions.MONITOR_DELTA).get(); + if((eps.isPresent() && outputTopic.isPresent()) || monitorTopic.isPresent()) { + Timer timer = new Timer(false); + long startTimeMs = System.currentTimeMillis(); + if(outputTopic.isPresent() && eps.isPresent()) { + List<String> templates = (List<String>)evaluatedArgs.get(LoadOptions.TEMPLATE).get(); + if(templates.isEmpty()) { + System.out.println("Empty templates, so nothing to do."); + return; + } + Optional<Object> biases = evaluatedArgs.get(LoadOptions.BIASED_SAMPLE); + Sampler sampler = new UnbiasedSampler(); + if(biases.isPresent()){ + sampler = new BiasedSampler((List<Map.Entry<Integer, Integer>>) biases.get(), templates.size()); + } + MessageGenerator generator = new MessageGenerator(templates, sampler); + Long targetLoad = (Long)eps.get(); + int periodsPerSecond = (int)(1000/sendDelta); + long messagesPerPeriod = targetLoad/periodsPerSecond; + String outputTopicStr = (String)outputTopic.get(); + System.out.println("Generating data to " + outputTopicStr + " at " + targetLoad + " events per second"); + System.out.println("Sending " + messagesPerPeriod + " messages to " + outputTopicStr + " every " + sendDelta + "ms"); + timer.scheduleAtFixedRate(new SendToKafka( outputTopicStr + , messagesPerPeriod + , numThreads + , generator + , pool + , numSent + , kafkaProducer + ) + , 0, sendDelta); + } + List<AbstractMonitor> monitors = new ArrayList<>(); + if(outputTopic.isPresent() && monitorTopic.isPresent()) { + System.out.println("Monitoring " + monitorTopic.get() + " every " + monitorDelta + " ms"); + monitors.add(new EPSGeneratedMonitor(outputTopic, numSent)); + monitors.add(new EPSThroughputWrittenMonitor(monitorTopic, kafkaConfig)); + } + else if(outputTopic.isPresent() && !monitorTopic.isPresent()) { + System.out.println("Monitoring " + outputTopic.get() + " every " + monitorDelta + " ms"); + monitors.add(new EPSGeneratedMonitor(outputTopic, numSent)); + monitors.add(new EPSThroughputWrittenMonitor(outputTopic, kafkaConfig)); + } + else if(!outputTopic.isPresent() && monitorTopic.isPresent()) { + System.out.println("Monitoring " + monitorTopic.get() + " every " + monitorDelta + " ms"); + monitors.add(new EPSThroughputWrittenMonitor(monitorTopic, kafkaConfig)); + } + else if(!outputTopic.isPresent() && !monitorTopic.isPresent()) { + System.out.println("You have not specified an output topic or a monitoring topic, so I have nothing to do here."); + } + int lookback = (int) evaluatedArgs.get(LoadOptions.SUMMARY_LOOKBACK).get(); + if(lookback > 0) { + System.out.println("Summarizing over the last " + lookback + " monitoring periods (" + lookback*monitorDelta + "ms)"); + } + else { + System.out.println("Turning off summarization."); + } + final CSVWriter csvWriter = new CSVWriter((File) evaluatedArgs.get(LoadOptions.CSV).orElse(null)); + Writer writer = new Writer(monitors, lookback, new ArrayList<Consumer<Writable>>() {{ + add(new ConsoleWriter()); + add(csvWriter); + }}); + timer.scheduleAtFixedRate(new MonitorTask(writer), 0, monitorDelta); + Optional<Object> timeLimit = evaluatedArgs.get(LoadOptions.TIME_LIMIT); + if(timeLimit.isPresent()) { + System.out.println("Ending in " + timeLimit.get() + " ms."); + timer.schedule(new TimerTask() { + @Override + public void run() { + timer.cancel(); + long durationS = (System.currentTimeMillis() - startTimeMs)/1000; + System.out.println("\nGenerated " + numSent.get() + " in " + durationS + " seconds." ); + csvWriter.close(); + System.exit(0); + } + } + + , (Long) timeLimit.get()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java new file mode 100644 index 0000000..b4d217d --- /dev/null +++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java @@ -0,0 +1,499 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load; + +import com.google.common.base.Joiner; +import org.apache.commons.cli.*; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.common.utils.cli.CLIOptions; +import org.apache.metron.performance.sampler.BiasedSampler; +import org.apache.metron.stellar.common.utils.ConversionUtils; +import org.apache.metron.common.utils.cli.OptionHandler; + +import javax.annotation.Nullable; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Optional; + +public enum LoadOptions implements CLIOptions<LoadOptions> { + HELP(new OptionHandler<LoadOptions>() { + + @Override + public String getShortCode() { + return "h"; + } + + @Nullable + @Override + public Option apply(@Nullable String s) { + return new Option(s, "help", false, "Generate Help screen"); + } + }), + ZK(new OptionHandler<LoadOptions>() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "zk_quorum", true, "zookeeper quorum"); + o.setArgName("QUORUM"); + o.setRequired(false); + return o; + } + + @Override + public Optional<Object> getValue(LoadOptions option, CommandLine cli) { + if(option.has(cli)) { + return Optional.ofNullable(option.get(cli)); + } + else { + return Optional.empty(); + } + } + + @Override + public String getShortCode() { + return "z"; + } + }), + CONSUMER_GROUP(new OptionHandler<LoadOptions>() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "consumer_group", true, "Consumer Group. The default is " + LoadGenerator.CONSUMER_GROUP); + o.setArgName("GROUP_ID"); + o.setRequired(false); + return o; + } + + @Override + public Optional<Object> getValue(LoadOptions option, CommandLine cli) { + if(option.has(cli)) { + return Optional.ofNullable(option.get(cli)); + } + else { + return Optional.of(LoadGenerator.CONSUMER_GROUP); + } + } + + @Override + public String getShortCode() { + return "cg"; + } + }), + BIASED_SAMPLE(new OptionHandler<LoadOptions>() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "sample_bias", true, "The discrete distribution to bias the sampling. " + + "This is a CSV of 2 columns. The first column is the % of the templates " + + "and the 2nd column is the probability (0-100) that it's chosen. For instance:\n" + + " 20,80\n" + + " 80,20\n" + + "implies that 20% of the templates will comprise 80% of the output and the remaining 80% of the templates will comprise 20% of the output."); + o.setArgName("BIAS_FILE"); + o.setRequired(false); + return o; + } + + @Override + public Optional<Object> getValue(LoadOptions option, CommandLine cli) { + if(!option.has(cli)) { + return Optional.empty(); + } + File discreteDistributionFile = new File(option.get(cli)); + if(discreteDistributionFile.exists()) { + try (BufferedReader br = new BufferedReader(new FileReader(discreteDistributionFile))){ + return Optional.ofNullable(BiasedSampler.readDistribution(br)); + } catch (IOException e) { + throw new IllegalStateException("Unable to read distribution file: " + option.get(cli), e); + } + } + else { + throw new IllegalStateException("Unable to read distribution file: " + option.get(cli) + " file doesn't exist."); + } + } + + @Override + public String getShortCode() { + return "bs"; + } + }) + ,CSV(new OptionHandler<LoadOptions>() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "csv", true, "A CSV file to emit monitoring data to. " + + "The format is a CSV with the following schema: timestamp, (name, eps, historical_mean, historical_stddev)+"); + o.setArgName("CSV_FILE"); + o.setRequired(false); + return o; + } + + @Override + public Optional<Object> getValue(LoadOptions option, CommandLine cli) { + if(!option.has(cli)) { + return Optional.empty(); + } + return Optional.of(new File(option.get(cli))); + } + + @Override + public String getShortCode() { + return "c"; + } + }) + ,TEMPLATE(new OptionHandler<LoadOptions>() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "template", true, "The template file to use for generation. This should be a file with a template per line with $METRON_TS and $METRON_GUID in the spots for timestamp and guid, if you so desire them."); + o.setArgName("TEMPLATE_FILE"); + o.setRequired(false); + return o; + } + + @Override + public Optional<Object> getValue(LoadOptions option, CommandLine cli) { + if(!option.has(cli)) { + return Optional.empty(); + } + File templateFile = new File(option.get(cli)); + if(templateFile.exists()) { + List<String> templates = new ArrayList<>(); + try(BufferedReader br = new BufferedReader(new FileReader(templateFile))) { + for(String line = null;(line = br.readLine()) != null;) { + templates.add(line); + } + return Optional.of(templates); + } catch (IOException e) { + throw new IllegalStateException("Unable to read template file: " + option.get(cli), e); + } + } + else { + throw new IllegalStateException("Unable to read template file: " + option.get(cli) + " file doesn't exist."); + } + } + + @Override + public String getShortCode() { + return "t"; + } + }) + ,SUMMARY_LOOKBACK(new OptionHandler<LoadOptions>() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "lookback", true, "When summarizing, how many monitoring periods should we summarize over? If 0, then no summary. Default: 5"); + o.setArgName("LOOKBACK"); + o.setRequired(false); + return o; + } + + @Override + public Optional<Object> getValue(LoadOptions option, CommandLine cli) { + if(option.has(cli)) { + return Optional.of(ConversionUtils.convert(option.get(cli), Integer.class)); + } + else { + return Optional.of(5); + } + } + + @Override + public String getShortCode() { + return "l"; + } + }) + ,EPS(new OptionHandler<LoadOptions>() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "eps", true, "The target events per second"); + o.setArgName("EPS"); + o.setRequired(false); + return o; + } + + @Override + public Optional<Object> getValue(LoadOptions option, CommandLine cli) { + if(option.has(cli)) { + return Optional.of(ConversionUtils.convert(option.get(cli), Long.class)); + } + else { + return Optional.empty(); + } + } + + @Override + public String getShortCode() { + return "e"; + } + }) + ,KAFKA_CONFIG(new OptionHandler<LoadOptions>() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "kafka_config", true, "The kafka config. This is a file containing a JSON map with the kafka config."); + o.setArgName("CONFIG_FILE"); + o.setRequired(false); + return o; + } + + @Override + public Optional<Object> getValue(LoadOptions option, CommandLine cli) { + if(!option.has(cli)) { + return Optional.empty(); + } + File configFile = new File(option.get(cli)); + if(configFile.exists()) { + try { + return Optional.ofNullable(JSONUtils.INSTANCE.load(configFile, JSONUtils.MAP_SUPPLIER)); + } catch (FileNotFoundException e) { + throw new IllegalStateException("Unable to read file: " + option.get(cli), e); + } catch (IOException e) { + throw new IllegalStateException("Unable to read file: " + option.get(cli), e); + } + } + else { + throw new IllegalStateException("Unable to read file: " + option.get(cli) + " file doesn't exist."); + } + } + + @Override + public String getShortCode() { + return "k"; + } + }), + SEND_DELTA(new OptionHandler<LoadOptions>() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "send_delta_ms", true, "The time (in ms) between sending a batch of messages. Default is " + LoadGenerator.SEND_PERIOD_MS); + o.setArgName("TIME_IN_MS"); + o.setRequired(false); + return o; + } + + @Override + public Optional<Object> getValue(LoadOptions option, CommandLine cli) { + if(option.has(cli)) { + Object res = option.get(cli); + return Optional.ofNullable(ConversionUtils.convert(res, Long.class)); + } + return Optional.of(LoadGenerator.SEND_PERIOD_MS); + + } + + @Override + public String getShortCode() { + return "sd"; + } + }), + MONITOR_DELTA(new OptionHandler<LoadOptions>() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "monitor_delta_ms", true, "The time (in ms) between monitoring output. Default is " + LoadGenerator.MONITOR_PERIOD_MS); + o.setArgName("TIME_IN_MS"); + o.setRequired(false); + return o; + } + + @Override + public Optional<Object> getValue(LoadOptions option, CommandLine cli) { + if(option.has(cli)) { + Object res = option.get(cli); + return Optional.ofNullable(ConversionUtils.convert(res, Long.class)); + } + return Optional.of(LoadGenerator.MONITOR_PERIOD_MS); + + } + + @Override + public String getShortCode() { + return "md"; + } + }) + ,TIME_LIMIT(new OptionHandler<LoadOptions>() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "time_limit_ms", true, "The total amount of time to run this in milliseconds. By default, it never stops."); + o.setArgName("MS"); + o.setRequired(false); + return o; + } + + @Override + public Optional<Object> getValue(LoadOptions option, CommandLine cli) { + if(option.has(cli)) { + Object res = option.get(cli); + Long timeMs = ConversionUtils.convert(res, Long.class); + return Optional.ofNullable(timeMs); + } + return Optional.empty(); + + } + + @Override + public String getShortCode() { + return "tl"; + } + }) + ,NUM_THREADS(new OptionHandler<LoadOptions>() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "threads", true, "The number of threads to use when extracting data. The default is the number of cores of your machine."); + o.setArgName("NUM_THREADS"); + o.setRequired(false); + return o; + } + + @Override + public Optional<Object> getValue(LoadOptions option, CommandLine cli) { + int numThreads = Runtime.getRuntime().availableProcessors(); + if(option.has(cli)) { + Object res = option.get(cli); + if(res instanceof String && res.toString().toUpperCase().endsWith("C")) { + numThreads *= ConversionUtils.convert(res.toString().trim().replace("C", ""), Integer.class); + } + else { + numThreads = ConversionUtils.convert(res, Integer.class); + } + } + return Optional.of(numThreads); + + } + + @Override + public String getShortCode() { + return "p"; + } + }) + ,OUTPUT_TOPIC(new OptionHandler<LoadOptions>() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "output_topic", true, "The kafka topic to write to"); + o.setArgName("TOPIC"); + o.setRequired(false); + return o; + } + + @Override + public Optional<Object> getValue(LoadOptions option, CommandLine cli) { + return Optional.ofNullable(option.get(cli)); + } + + @Override + public String getShortCode() { + return "ot"; + } + }), + MONITOR_TOPIC(new OptionHandler<LoadOptions>() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "monitor_topic", true, "The kafka topic to monitor."); + o.setArgName("TOPIC"); + o.setRequired(false); + return o; + } + + @Override + public Optional<Object> getValue(LoadOptions option, CommandLine cli) { + return Optional.ofNullable(option.get(cli)); + } + + @Override + public String getShortCode() { + return "mt"; + } + }), + ; + Option option; + String shortCode; + OptionHandler<LoadOptions> handler; + LoadOptions(OptionHandler<LoadOptions> optionHandler) { + this.shortCode = optionHandler.getShortCode(); + this.handler = optionHandler; + this.option = optionHandler.apply(shortCode); + } + + @Override + public Option getOption() { + return option; + } + + public boolean has(CommandLine cli) { + return cli.hasOption(shortCode); + } + + public String get(CommandLine cli) { + return cli.getOptionValue(shortCode); + } + + @Override + public OptionHandler<LoadOptions> getHandler() { + return null; + } + + public static CommandLine parse(CommandLineParser parser, String[] args) { + try { + CommandLine cli = parser.parse(getOptions(), args); + if(HELP.has(cli)) { + printHelp(); + System.exit(0); + } + return cli; + } catch (ParseException e) { + System.err.println("Unable to parse args: " + Joiner.on(' ').join(args)); + e.printStackTrace(System.err); + printHelp(); + System.exit(-1); + return null; + } + } + + public static EnumMap<LoadOptions, Optional<Object> > createConfig(CommandLine cli) { + EnumMap<LoadOptions, Optional<Object> > ret = new EnumMap<>(LoadOptions.class); + for(LoadOptions option : values()) { + ret.put(option, option.handler.getValue(option, cli)); + } + return ret; + } + + public static void printHelp() { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp( "Generator", getOptions()); + } + + public static Options getOptions() { + Options ret = new Options(); + for(LoadOptions o : LoadOptions.values()) { + ret.addOption(o.option); + } + return ret; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/MessageGenerator.java ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/MessageGenerator.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/MessageGenerator.java new file mode 100644 index 0000000..572d438 --- /dev/null +++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/MessageGenerator.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load; + +import org.apache.metron.performance.sampler.Sampler; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +public class MessageGenerator implements Supplier<String> { + private static ThreadLocal<Random> rng = ThreadLocal.withInitial(() -> new Random()); + private static AtomicLong guidOffset = new AtomicLong(0); + private static String guidPrefix = "00000000-0000-0000-0000-"; + private List<String> patterns; + private Sampler sampler; + public MessageGenerator(List<String> patterns, Sampler sampler) { + this.patterns = patterns; + this.sampler = sampler; + } + + @Override + public String get() { + int sample = sampler.sample(rng.get(), patterns.size()); + String pattern = patterns.get(sample); + long guidId = guidOffset.getAndIncrement(); + String guid = guidPrefix + guidId; + String ts = "" + System.currentTimeMillis(); + return pattern.replace("$METRON_TS", ts) + .replace("$METRON_GUID", guid); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java new file mode 100644 index 0000000..67bf469 --- /dev/null +++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +public class SendToKafka extends TimerTask { + private long numMessagesSent; + private long numSentLast = 0; + private long batchSize; + private int numBatches; + private Supplier<String> messageSupplier; + private String kafkaTopic; + private ExecutorService pool; + protected AtomicLong numSent; + private ThreadLocal<KafkaProducer<String, String>> kafkaProducer; + public SendToKafka( String kafkaTopic + , long numMessagesSent + , int numBatches + , Supplier<String> messageSupplier + , ExecutorService pool + , AtomicLong numSent + , ThreadLocal<KafkaProducer<String, String>> kafkaProducer + ) + { + this.numSent = numSent; + this.kafkaProducer = kafkaProducer; + this.pool = pool; + this.numMessagesSent = numMessagesSent; + this.messageSupplier = messageSupplier; + this.numBatches = numBatches; + this.batchSize = numMessagesSent/numBatches; + this.kafkaTopic = kafkaTopic; + } + + @Override + public void run() { + long numSentCurrent = numSent.get(); + long numSentSince = numSentCurrent - numSentLast; + boolean sendMessages = numSentLast == 0 || numSentSince >= numMessagesSent; + if(sendMessages) { + Collection<Future<Long>> futures = Collections.synchronizedList(new ArrayList<>()); + for(int batch = 0;batch < numBatches;++batch) { + try { + futures.add(pool.submit(() -> { + KafkaProducer<String, String> producer = kafkaProducer.get(); + Collection<Future<?>> b = Collections.synchronizedCollection(new ArrayList<>()); + for (int i = 0; i < batchSize; ++i) { + b.add(sendToKafka(producer, kafkaTopic, messageSupplier.get())); + } + for(Future<?> f : b) { + f.get(); + } + return batchSize; + })); + + } catch (Exception e) { + e.printStackTrace(System.err); + } + } + for(Future<Long> f : futures) { + try { + f.get(); + } catch (Exception e) { + e.printStackTrace(System.err); + } + } + numSentLast = numSentCurrent; + } + } + + protected Future<?> sendToKafka(KafkaProducer<String, String> producer, String kafkaTopic, String message) { + return producer.send(new ProducerRecord<>(kafkaTopic, message), + (recordMetadata, e) -> { + if(e != null) { + e.printStackTrace(System.err); + } + numSent.incrementAndGet(); + } + ); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/AbstractMonitor.java ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/AbstractMonitor.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/AbstractMonitor.java new file mode 100644 index 0000000..80cb5cc --- /dev/null +++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/AbstractMonitor.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load.monitor; + +import java.util.Optional; +import java.util.function.Supplier; + +public abstract class AbstractMonitor implements Supplier<Long>, MonitorNaming { + private static final double EPSILON = 1e-6; + protected Optional<?> kafkaTopic; + protected long timestampPrevious = 0; + public AbstractMonitor(Optional<?> kafkaTopic) { + this.kafkaTopic = kafkaTopic; + } + + protected abstract Long monitor(double deltaTs); + + @Override + public Long get() { + long timeStarted = System.currentTimeMillis(); + Long ret = null; + if(timestampPrevious > 0) { + double deltaTs = (timeStarted - timestampPrevious) / 1000.0; + if (Math.abs(deltaTs) > EPSILON) { + ret = monitor(deltaTs); + } + } + timestampPrevious = timeStarted; + return ret; + } + + public abstract String format(); + +} http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSGeneratedMonitor.java ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSGeneratedMonitor.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSGeneratedMonitor.java new file mode 100644 index 0000000..3e380bb --- /dev/null +++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSGeneratedMonitor.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load.monitor; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +public class EPSGeneratedMonitor extends AbstractMonitor { + private AtomicLong numSent; + private long numSentPrevious = 0; + public EPSGeneratedMonitor(Optional<?> kafkaTopic, AtomicLong numSent) { + super(kafkaTopic); + this.numSent = numSent; + } + + @Override + protected Long monitor(double deltaTs) { + if(kafkaTopic.isPresent()) { + long totalProcessed = numSent.get(); + long written = (totalProcessed - numSentPrevious); + long epsWritten = (long) (written / deltaTs); + numSentPrevious = totalProcessed; + return epsWritten; + } + return null; + } + + @Override + public String format() { + return "%d eps generated to " + kafkaTopic.get(); + } + + @Override + public String name() { + return "generated"; + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSThroughputWrittenMonitor.java ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSThroughputWrittenMonitor.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSThroughputWrittenMonitor.java new file mode 100644 index 0000000..96efd1d --- /dev/null +++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSThroughputWrittenMonitor.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load.monitor; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.metron.performance.util.KafkaUtil; + +import java.util.Map; +import java.util.Optional; + +public class EPSThroughputWrittenMonitor extends AbstractMonitor { + Map<Integer, Long> lastOffsetMap = null; + KafkaConsumer<String, String> consumer; + public EPSThroughputWrittenMonitor(Optional<?> kafkaTopic, Map<String, Object> kafkaProps) { + super(kafkaTopic); + consumer = new KafkaConsumer<>(kafkaProps); + } + + private Long writtenSince(Map<Integer, Long> partitionOffsets, Map<Integer, Long> lastOffsetMap) { + if(partitionOffsets == null) { + return null; + } + long sum = 0; + for(Map.Entry<Integer, Long> partitionOffset : partitionOffsets.entrySet()) { + sum += partitionOffset.getValue() - lastOffsetMap.get(partitionOffset.getKey()); + } + return sum; + } + + @Override + protected Long monitor(double deltaTs) { + Optional<Long> epsWritten = Optional.empty(); + if(kafkaTopic.isPresent()) { + if(lastOffsetMap != null) { + Map<Integer, Long> currentOffsets = KafkaUtil.INSTANCE.getKafkaOffsetMap(consumer, (String) kafkaTopic.get()); + Long eventsWrittenSince = writtenSince(currentOffsets, lastOffsetMap); + if (eventsWrittenSince != null) { + epsWritten = Optional.of((long) (eventsWrittenSince / deltaTs)); + } + lastOffsetMap = currentOffsets == null ? lastOffsetMap : currentOffsets; + if (epsWritten.isPresent()) { + return epsWritten.get(); + } + } + else { + lastOffsetMap = KafkaUtil.INSTANCE.getKafkaOffsetMap(consumer, (String)kafkaTopic.get()); + } + } + return null; + } + + @Override + public String format() { + return "%d eps throughput measured for " + kafkaTopic.get(); + } + + @Override + public String name() { + return "throughput measured"; + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorNaming.java ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorNaming.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorNaming.java new file mode 100644 index 0000000..4833c17 --- /dev/null +++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorNaming.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load.monitor; + +public interface MonitorNaming { + String format(); + String name(); +} http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorTask.java ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorTask.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorTask.java new file mode 100644 index 0000000..1e02a00 --- /dev/null +++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorTask.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load.monitor; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; +import org.apache.metron.performance.load.monitor.writers.Writer; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import java.util.TimerTask; + +public class MonitorTask extends TimerTask { + private Writer writer; + public MonitorTask(Writer writer) { + this.writer = writer; + } + + /** + * The action to be performed by this timer task. + */ + @Override + public void run() { + writer.writeAll(); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/Results.java ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/Results.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/Results.java new file mode 100644 index 0000000..e094b74 --- /dev/null +++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/Results.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load.monitor; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; + +import java.util.Optional; + +public class Results { + private String format; + private String name; + private Optional<DescriptiveStatistics> history; + private Long eps; + public Results(String format, String name, Long eps, Optional<DescriptiveStatistics> history) { + this.format = format; + this.name = name; + this.history = history; + this.eps = eps; + } + + public String getName() { + return name; + } + + public Long getEps() { + return eps; + } + + public String getFormat() { + return format; + } + + public Optional<DescriptiveStatistics> getHistory() { + return history; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/CSVWriter.java ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/CSVWriter.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/CSVWriter.java new file mode 100644 index 0000000..112206d --- /dev/null +++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/CSVWriter.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load.monitor.writers; + +import com.google.common.base.Joiner; +import org.apache.metron.performance.load.monitor.Results; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; + +public class CSVWriter implements Consumer<Writable> { + private Optional<PrintWriter> pw = Optional.empty(); + + public CSVWriter(File outFile) throws IOException { + if(outFile != null) { + pw = Optional.of(new PrintWriter(new FileWriter(outFile))); + } + } + + @Override + public void accept(Writable writable) { + if(pw.isPresent()) { + List<String> parts = new ArrayList<>(); + parts.add("" + writable.getDate().getTime()); + for (Results r : writable.getResults()) { + parts.add(r.getName()); + parts.add(r.getEps() == null?"":(r.getEps() + "")); + if (r.getHistory().isPresent()) { + parts.add("" + (int) r.getHistory().get().getMean()); + parts.add("" + (int) Math.sqrt(r.getHistory().get().getVariance())); + } else { + parts.add(""); + parts.add(""); + } + } + pw.get().println(Joiner.on(",").join(parts)); + pw.get().flush(); + } + } + + public void close() { + if(pw.isPresent()) { + pw.get().close(); + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/ConsoleWriter.java ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/ConsoleWriter.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/ConsoleWriter.java new file mode 100644 index 0000000..efb2ad3 --- /dev/null +++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/ConsoleWriter.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load.monitor.writers; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; +import org.apache.metron.performance.load.monitor.Results; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.function.Consumer; + +public class ConsoleWriter implements Consumer<Writable> { + + private String getSummary(DescriptiveStatistics stats) { + return String.format("Mean: %d, Std Dev: %d", (int)stats.getMean(), (int)Math.sqrt(stats.getVariance())); + } + + @Override + public void accept(Writable writable) { + List<String> parts = new ArrayList<>(); + Date date = writable.getDate(); + for(Results r : writable.getResults()) { + Long eps = r.getEps(); + if(eps != null) { + String part = String.format(r.getFormat(), eps); + if (r.getHistory().isPresent()) { + part += " (" + getSummary(r.getHistory().get()) + ")"; + } + parts.add(part); + } + } + if(date != null) { + DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); + String header = dateFormat.format(date) + " - "; + String emptyHeader = StringUtils.repeat(" ", header.length()); + for (int i = 0; i < parts.size(); ++i) { + String part = parts.get(i); + if (i == 0) { + System.out.println(header + (part == null ? "" : part)); + } else { + System.out.println(emptyHeader + (part == null ? "" : part)); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writable.java ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writable.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writable.java new file mode 100644 index 0000000..3ed62bf --- /dev/null +++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writable.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load.monitor.writers; + +import org.apache.metron.performance.load.monitor.Results; + +import java.util.Date; +import java.util.List; + +public class Writable { + private Date date; + private List<Results> results; + public Writable(Date date, List<Results> results) { + this.date = date; + this.results = results; + } + + public Date getDate() { + return date; + } + + public List<Results> getResults() { + return results; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java new file mode 100644 index 0000000..a9d915b --- /dev/null +++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load.monitor.writers; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; +import org.apache.metron.performance.load.monitor.AbstractMonitor; +import org.apache.metron.performance.load.monitor.Results; + +import java.util.ArrayList; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; + +public class Writer { + + private int summaryLookback; + private List<LinkedList<Double>> summaries = new ArrayList<>(); + private List<Consumer<Writable>> writers; + private List<AbstractMonitor> monitors; + + public Writer(List<AbstractMonitor> monitors, int summaryLookback, List<Consumer<Writable>> writers) { + this.summaryLookback = summaryLookback; + this.writers = writers; + this.monitors = monitors; + for(AbstractMonitor m : monitors) { + this.summaries.add(new LinkedList<>()); + } + } + + public void writeAll() { + int i = 0; + Date dateOf = new Date(); + List<Results> results = new ArrayList<>(); + for(AbstractMonitor m : monitors) { + Long eps = m.get(); + if(eps != null && summaryLookback > 0) { + LinkedList<Double> summary = summaries.get(i); + addToLookback(eps.doubleValue(), summary); + results.add(new Results(m.format(), m.name(), eps, Optional.of(getStats(summary)))); + } + else { + results.add(new Results(m.format(), m.name(), eps, Optional.empty())); + } + i++; + } + Writable writable = new Writable(dateOf, results); + for(Consumer<Writable> writer : writers) { + writer.accept(writable); + } + } + + private void addToLookback(Double d, LinkedList<Double> lookback) { + if(lookback.size() >= summaryLookback) { + lookback.removeFirst(); + } + lookback.addLast(d); + } + + public DescriptiveStatistics getStats(List<Double> avg) { + DescriptiveStatistics stats = new DescriptiveStatistics(); + for(Double d : avg) { + if(d == null || Double.isNaN(d)) { + continue; + } + stats.addValue(d); + } + return stats; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java new file mode 100644 index 0000000..f0a5b2c --- /dev/null +++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.sampler; + +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; + +public class BiasedSampler implements Sampler { + TreeMap<Double, Map.Entry<Integer, Integer>> discreteDistribution; + public BiasedSampler(List<Map.Entry<Integer, Integer>> discreteDistribution, int max) { + this.discreteDistribution = createDistribution(discreteDistribution, max); + } + + public static List<Map.Entry<Integer, Integer>> readDistribution(BufferedReader distrFile) throws IOException { + return readDistribution(distrFile, false); + } + + public static List<Map.Entry<Integer, Integer>> readDistribution(BufferedReader distrFile, boolean quiet) throws IOException { + List<Map.Entry<Integer, Integer>> ret = new ArrayList<>(); + if(!quiet) { + System.out.println("Using biased sampler with the following biases:"); + } + int sumLeft = 0; + int sumRight = 0; + for(String line = null;(line = distrFile.readLine()) != null;) { + if(line.startsWith("#")) { + continue; + } + Iterable<String> it = Splitter.on(",").split(line.trim()); + if(Iterables.size(it) != 2) { + throw new IllegalArgumentException(line + " should be a comma separated pair of integers, but was not."); + } + int left = Integer.parseInt(Iterables.getFirst(it, null)); + int right = Integer.parseInt(Iterables.getLast(it, null)); + if(left <= 0 || left > 100) { + throw new IllegalArgumentException(line + ": " + (left < 0?left:right) + " must a positive integer in (0, 100]"); + } + if(right <= 0 || right > 100) { + throw new IllegalArgumentException(line + ": " + right + " must a positive integer in (0, 100]"); + } + if(!quiet) { + System.out.println("\t" + left + "% of templates will comprise roughly " + right + "% of sample output"); + } + ret.add(new AbstractMap.SimpleEntry<>(left, right)); + sumLeft += left; + sumRight += right; + } + if(sumLeft > 100 || sumRight > 100 ) { + throw new IllegalStateException("Neither columns must sum to beyond 100. " + + "The first column is the % of templates. " + + "The second column is the % of the sample that % of template occupies."); + } + else if(sumLeft < 100 && sumRight < 100) { + int left = 100 - sumLeft; + int right = 100 - sumRight; + if(!quiet) { + System.out.println("\t" + left + "% of templates will comprise roughly " + right + "% of sample output"); + } + ret.add(new AbstractMap.SimpleEntry<>(left, right)); + } + return ret; + + } + + private static TreeMap<Double, Map.Entry<Integer, Integer>> + createDistribution(List<Map.Entry<Integer, Integer>> discreteDistribution, int max) { + TreeMap<Double, Map.Entry<Integer, Integer>> ret = new TreeMap<>(); + int from = 0; + double weight = 0.0d; + for(Map.Entry<Integer, Integer> kv : discreteDistribution) { + double pctVals = kv.getKey()/100.0; + int to = from + (int)(max*pctVals); + double pctWeight = kv.getValue()/100.0; + ret.put(weight, new AbstractMap.SimpleEntry<>(from, to)); + weight += pctWeight; + from = to; + } + return ret; + } + + @Override + public int sample(Random rng, int limit) { + double weight = rng.nextDouble(); + Map.Entry<Integer, Integer> range = discreteDistribution.floorEntry(weight).getValue(); + return rng.nextInt(range.getValue() - range.getKey()) + range.getKey(); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/Sampler.java ---------------------------------------------------------------------- diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/Sampler.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/Sampler.java new file mode 100644 index 0000000..e5f03c8 --- /dev/null +++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/Sampler.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.sampler; + +import java.util.Random; + +public interface Sampler { + int sample(Random rng, int limit); +}