[GitHub] metron issue #942: METRON-1461: Modify the MIN, MAX Stellar methods to take ...
Github user cestella commented on the issue: https://github.com/apache/metron/pull/942 So, I would suggest rather than accepting a list of stats objects, `MAX` and `MIN` accept one of the following: * A StatisticsProvider object * A list of comparables Essentially, these two would both function: * `MAX(STATS_ADD(null, 1, 2, 3))` * `MAX([1, 2, 3])` ---
[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...
Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/958#discussion_r174186826 --- Diff: metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java --- @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.sampler; + +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; + +public class BiasedSampler implements Sampler { + TreeMap> discreteDistribution; + public BiasedSampler(List> discreteDistribution, int max) { +this.discreteDistribution = createDistribution(discreteDistribution, max); + } + + public static List> readDistribution(File distrFile) throws IOException { +List> ret = new ArrayList<>(); +System.out.println("Using biased sampler with the following biases:"); +try(BufferedReader br = new BufferedReader(new FileReader(distrFile))) { + int sumLeft = 0; + int sumRight = 0; + for(String line = null;(line = br.readLine()) != null;) { +if(line.startsWith("#")) { + continue; +} +Iterable it = Splitter.on(",").split(line.trim()); +int left = Integer.parseInt(Iterables.getFirst(it, null)); +int right = Integer.parseInt(Iterables.getLast(it, null)); +System.out.println("\t" + left + "% of templates will comprise roughly " + right + "% of sample output"); +ret.add(new AbstractMap.SimpleEntry<>(left, right)); +sumLeft += left; +sumRight += right; + } + if(sumLeft > 100 || sumRight > 100 ) { +throw new IllegalStateException("Neither columns must sum to beyond 100. " + +"The first column is the % of templates. " + +"The second column is the % of the sample that % of template occupies."); + } + else if(sumLeft < 100 && sumRight < 100) { +int left = 100 - sumLeft; +int right = 100 - sumRight; +System.out.println("\t" + left + "% of templates will comprise roughly " + right + "% of sample output"); +ret.add(new AbstractMap.SimpleEntry<>(left, right)); + } + return ret; +} + } + + private static TreeMap> + createDistribution(List> discreteDistribution, int max) { +TreeMap> ret = new TreeMap<>(); +int from = 0; +double weight = 0.0d; +for(Map.Entry kv : discreteDistribution) { + double pctVals = kv.getKey()/100.0; + int to = from + (int)(max*pctVals); + double pctWeight = kv.getValue()/100.0; + ret.put(weight, new AbstractMap.SimpleEntry<>(from, to)); + weight += pctWeight; + from = to; +} +return ret; + } + + @Override + public int sample(Random rng, int limit) { +double weight = rng.nextDouble(); +Map.Entry range = discreteDistribution.floorEntry(weight).getValue(); +return rng.nextInt(range.getValue() - range.getKey()) + range.getKey(); --- End diff -- done ---
[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...
Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/958#discussion_r174186723 --- Diff: metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java --- @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.sampler; + +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; + +public class BiasedSampler implements Sampler { + TreeMap> discreteDistribution; + public BiasedSampler(List> discreteDistribution, int max) { +this.discreteDistribution = createDistribution(discreteDistribution, max); + } + + public static List> readDistribution(File distrFile) throws IOException { +List> ret = new ArrayList<>(); +System.out.println("Using biased sampler with the following biases:"); +try(BufferedReader br = new BufferedReader(new FileReader(distrFile))) { + int sumLeft = 0; + int sumRight = 0; + for(String line = null;(line = br.readLine()) != null;) { +if(line.startsWith("#")) { + continue; +} +Iterable it = Splitter.on(",").split(line.trim()); +int left = Integer.parseInt(Iterables.getFirst(it, null)); --- End diff -- done ---
[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...
Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/958#discussion_r174186700 --- Diff: metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java --- @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load.monitor.writers; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; +import org.apache.metron.performance.load.monitor.AbstractMonitor; +import org.apache.metron.performance.load.monitor.Results; + +import java.util.ArrayList; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; + +public class Writer { + + private int summaryLookback; + private List> summaries = new ArrayList<>(); + private List> writers; + private List monitors; + + public Writer(List monitors, int summaryLookback, List> writers) { +this.summaryLookback = summaryLookback; +this.writers = writers; +this.monitors = monitors; +for(AbstractMonitor m : monitors) { + this.summaries.add(new LinkedList<>()); +} + } + + public void writeAll() { +int i = 0; +Date dateOf = new Date(); +List results = new ArrayList<>(); +for(AbstractMonitor m : monitors) { + Long eps = m.get(); + if(eps != null) { +if (summaryLookback > 0) { + LinkedList summary = summaries.get(i); + addToLookback(eps == null ? Double.NaN : eps.doubleValue(), summary); + results.add(new Results(m.format(), m.name(), eps, Optional.of(getStats(summary; +} +else { + results.add(new Results(m.format(),m.name(), eps, Optional.empty())); +} + } + else { --- End diff -- done ---
[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...
Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/958#discussion_r174186800 --- Diff: metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/UnbiasedSampler.java --- @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.sampler; + +import java.util.Random; + +public class UnbiasedSampler implements Sampler { + + @Override + public int sample(Random rng, int limit) { +return rng.nextInt(limit); --- End diff -- done ---
[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...
Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/958#discussion_r174186641 --- Diff: metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java --- @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load.monitor.writers; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; +import org.apache.metron.performance.load.monitor.AbstractMonitor; +import org.apache.metron.performance.load.monitor.Results; + +import java.util.ArrayList; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; + +public class Writer { + + private int summaryLookback; + private List> summaries = new ArrayList<>(); + private List> writers; + private List monitors; + + public Writer(List monitors, int summaryLookback, List> writers) { +this.summaryLookback = summaryLookback; +this.writers = writers; +this.monitors = monitors; +for(AbstractMonitor m : monitors) { + this.summaries.add(new LinkedList<>()); +} + } + + public void writeAll() { +int i = 0; +Date dateOf = new Date(); +List results = new ArrayList<>(); +for(AbstractMonitor m : monitors) { + Long eps = m.get(); + if(eps != null) { +if (summaryLookback > 0) { + LinkedList summary = summaries.get(i); + addToLookback(eps == null ? Double.NaN : eps.doubleValue(), summary); --- End diff -- done ---
[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...
Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/958#discussion_r174186597 --- Diff: metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/ConsoleWriter.java --- @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load.monitor.writers; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; +import org.apache.metron.performance.load.monitor.Results; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.function.Consumer; + +public class ConsoleWriter implements Consumer { + + private String getSummary(DescriptiveStatistics stats) { +return String.format("Mean: %d, Std Dev: %d", (int)stats.getMean(), (int)Math.sqrt(stats.getVariance())); + } + + @Override + public void accept(Writable writable) { +List parts = new ArrayList<>(); +Date date = writable.getDate(); +for(Results r : writable.getResults()) { + Long eps = r.getEps(); + if(eps != null) { +String part = String.format(r.getFormat(), eps); +if (r.getHistory().isPresent()) { + part += " (" + getSummary(r.getHistory().get()) + ")"; +} +parts.add(part); + } +} +if(date != null) { + DateFormat dateFormat = new SimpleDateFormat("/MM/dd HH:mm:ss"); + String header = dateFormat.format(date) + " - "; + String emptyHeader = ""; + for (int i = 0; i < header.length(); ++i) { --- End diff -- done ---
[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...
Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/958#discussion_r174186529 --- Diff: metron-contrib/metron-performance/src/test/java/org/apache/metron/performance/load/SendToKafkaTest.java --- @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; + +public class SendToKafkaTest { + + @Test + public void testWritesCorrectNumber() throws InterruptedException { --- End diff -- done ---
[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...
Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/958#discussion_r174186458 --- Diff: metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java --- @@ -0,0 +1,504 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load; + +import com.google.common.base.Joiner; +import org.apache.commons.cli.*; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.common.utils.cli.CLIOptions; +import org.apache.metron.performance.sampler.BiasedSampler; +import org.apache.metron.stellar.common.utils.ConversionUtils; +import org.apache.metron.common.utils.cli.OptionHandler; + +import javax.annotation.Nullable; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Optional; + +public enum LoadOptions implements CLIOptions { + HELP(new OptionHandler() { + +@Override +public String getShortCode() { + return "h"; +} + +@Nullable +@Override +public Option apply(@Nullable String s) { + return new Option(s, "help", false, "Generate Help screen"); +} + }), + ZK(new OptionHandler() { +@Nullable +@Override +public Option apply(@Nullable String s) { + Option o = new Option(s, "zk_quorum", true, "zookeeper quorum"); + o.setArgName("QUORUM"); + o.setRequired(false); + return o; +} + +@Override +public Optional getValue(LoadOptions option, CommandLine cli) { + if(option.has(cli)) { +return Optional.ofNullable(option.get(cli)); + } + else { +return Optional.empty(); + } +} + +@Override +public String getShortCode() { + return "z"; +} + }), + CONSUMER_GROUP(new OptionHandler() { +@Nullable +@Override +public Option apply(@Nullable String s) { + Option o = new Option(s, "consumer_group", true, "Consumer Group. The default is " + LoadGenerator.CONSUMER_GROUP); + o.setArgName("GROUP_ID"); + o.setRequired(false); + return o; +} + +@Override +public Optional getValue(LoadOptions option, CommandLine cli) { + if(option.has(cli)) { +return Optional.ofNullable(option.get(cli)); + } + else { +return Optional.of(LoadGenerator.CONSUMER_GROUP); + } +} + +@Override +public String getShortCode() { + return "cg"; +} + }), + BIASED_SAMPLE(new OptionHandler() { +@Nullable +@Override +public Option apply(@Nullable String s) { + Option o = new Option(s, "sample_bias", true, "The discrete distribution to bias the sampling. " + + "This is a CSV of 2 columns. The first column is the % of the templates " + + "and the 2nd column is the probability (0-100) that it's chosen. For instance:\n" + + " 20,80\n" + + " 80,20\n" + + "implies that 20% of the templates will comprise 80% of the output and the remaining 80% of the templates will comprise 20% of the output."); + o.setArgName("BIAS_FILE"); + o.setRequired(false); + return o; +} + +@Override +public Optional getValue(LoadOptions option, CommandLine cli) { + if(!option.has(cli)) { +return Optional.empty(); + } + File discreteDistributionFile = new File(option.get(cli)); + if(discreteDistributionFile.exists()) { +try { + + return Optional.ofNullable(BiasedSampler.readDistribution(discreteDistrib
[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...
Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/958#discussion_r174186359 --- Diff: metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java --- @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load; + + +import com.google.common.base.Joiner; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.PosixParser; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.metron.common.utils.KafkaUtils; +import org.apache.metron.performance.load.monitor.AbstractMonitor; +import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor; +import org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor; +import org.apache.metron.performance.load.monitor.MonitorTask; +import org.apache.metron.performance.load.monitor.writers.CSVWriter; +import org.apache.metron.performance.load.monitor.writers.ConsoleWriter; +import org.apache.metron.performance.load.monitor.writers.Writable; +import org.apache.metron.performance.load.monitor.writers.Writer; +import org.apache.metron.performance.sampler.BiasedSampler; +import org.apache.metron.performance.sampler.Sampler; +import org.apache.metron.performance.sampler.UnbiasedSampler; + +import java.io.File; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +public class LoadGenerator +{ + public static String CONSUMER_GROUP = "load.group"; + public static long SEND_PERIOD_MS = 100; + public static long MONITOR_PERIOD_MS = 1000*10; + private static ExecutorService pool; + private static ThreadLocal kafkaProducer; + public static AtomicLong numSent = new AtomicLong(0); + + public static void main( String[] args ) throws Exception { +CommandLine cli = LoadOptions.parse(new PosixParser(), args); +EnumMap> evaluatedArgs = LoadOptions.createConfig(cli); +Map kafkaConfig = new HashMap<>(); +kafkaConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); +kafkaConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); +kafkaConfig.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); +kafkaConfig.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); +if(LoadOptions.ZK.has(cli)) { + String zkQuorum = (String) evaluatedArgs.get(LoadOptions.ZK).get(); + kafkaConfig.put("bootstrap.servers", Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum))); --- End diff -- done ---
[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...
Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/958#discussion_r174186403 --- Diff: metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java --- @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load; + + +import com.google.common.base.Joiner; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.PosixParser; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.metron.common.utils.KafkaUtils; +import org.apache.metron.performance.load.monitor.AbstractMonitor; +import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor; +import org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor; +import org.apache.metron.performance.load.monitor.MonitorTask; +import org.apache.metron.performance.load.monitor.writers.CSVWriter; +import org.apache.metron.performance.load.monitor.writers.ConsoleWriter; +import org.apache.metron.performance.load.monitor.writers.Writable; +import org.apache.metron.performance.load.monitor.writers.Writer; +import org.apache.metron.performance.sampler.BiasedSampler; +import org.apache.metron.performance.sampler.Sampler; +import org.apache.metron.performance.sampler.UnbiasedSampler; + +import java.io.File; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +public class LoadGenerator +{ + public static String CONSUMER_GROUP = "load.group"; + public static long SEND_PERIOD_MS = 100; + public static long MONITOR_PERIOD_MS = 1000*10; + private static ExecutorService pool; + private static ThreadLocal kafkaProducer; + public static AtomicLong numSent = new AtomicLong(0); + + public static void main( String[] args ) throws Exception { +CommandLine cli = LoadOptions.parse(new PosixParser(), args); +EnumMap> evaluatedArgs = LoadOptions.createConfig(cli); +Map kafkaConfig = new HashMap<>(); +kafkaConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); +kafkaConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); +kafkaConfig.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); +kafkaConfig.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); +if(LoadOptions.ZK.has(cli)) { + String zkQuorum = (String) evaluatedArgs.get(LoadOptions.ZK).get(); + kafkaConfig.put("bootstrap.servers", Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum))); +} +String groupId = evaluatedArgs.get(LoadOptions.CONSUMER_GROUP).get().toString(); +System.out.println("Consumer Group: " + groupId); +kafkaConfig.put("group.id", groupId); --- End diff -- done ---
[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...
Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/958#discussion_r174186490 --- Diff: metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/MessageGenerator.java --- @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load; + +import org.apache.metron.performance.sampler.Sampler; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +public class MessageGenerator implements Supplier { + private static ThreadLocal rng = ThreadLocal.withInitial(() -> new Random()); + private static AtomicLong guidOffset = new AtomicLong(0); + private static String guidPrefix = "6141faf6-a8ba-4044-ab80-"; --- End diff -- done ---
[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...
Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/958#discussion_r174186422 --- Diff: metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java --- @@ -0,0 +1,504 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load; + +import com.google.common.base.Joiner; +import org.apache.commons.cli.*; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.common.utils.cli.CLIOptions; +import org.apache.metron.performance.sampler.BiasedSampler; +import org.apache.metron.stellar.common.utils.ConversionUtils; +import org.apache.metron.common.utils.cli.OptionHandler; + +import javax.annotation.Nullable; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Optional; + +public enum LoadOptions implements CLIOptions { + HELP(new OptionHandler() { + +@Override +public String getShortCode() { + return "h"; +} + +@Nullable +@Override +public Option apply(@Nullable String s) { + return new Option(s, "help", false, "Generate Help screen"); +} + }), + ZK(new OptionHandler() { +@Nullable +@Override +public Option apply(@Nullable String s) { + Option o = new Option(s, "zk_quorum", true, "zookeeper quorum"); + o.setArgName("QUORUM"); + o.setRequired(false); + return o; +} + +@Override +public Optional getValue(LoadOptions option, CommandLine cli) { + if(option.has(cli)) { +return Optional.ofNullable(option.get(cli)); + } + else { +return Optional.empty(); + } +} + +@Override +public String getShortCode() { + return "z"; +} + }), + CONSUMER_GROUP(new OptionHandler() { +@Nullable +@Override +public Option apply(@Nullable String s) { + Option o = new Option(s, "consumer_group", true, "Consumer Group. The default is " + LoadGenerator.CONSUMER_GROUP); + o.setArgName("GROUP_ID"); + o.setRequired(false); + return o; +} + +@Override +public Optional getValue(LoadOptions option, CommandLine cli) { + if(option.has(cli)) { +return Optional.ofNullable(option.get(cli)); + } + else { +return Optional.of(LoadGenerator.CONSUMER_GROUP); + } +} + +@Override +public String getShortCode() { + return "cg"; +} + }), + BIASED_SAMPLE(new OptionHandler() { +@Nullable +@Override +public Option apply(@Nullable String s) { + Option o = new Option(s, "sample_bias", true, "The discrete distribution to bias the sampling. " + + "This is a CSV of 2 columns. The first column is the % of the templates " + + "and the 2nd column is the probability (0-100) that it's chosen. For instance:\n" + + " 20,80\n" + + " 80,20\n" + + "implies that 20% of the templates will comprise 80% of the output and the remaining 80% of the templates will comprise 20% of the output."); + o.setArgName("BIAS_FILE"); + o.setRequired(false); + return o; +} + +@Override +public Optional getValue(LoadOptions option, CommandLine cli) { + if(!option.has(cli)) { +return Optional.empty(); + } + File discreteDistributionFile = new File(option.get(cli)); + if(discreteDistributionFile.exists()) { +try { + + return Optional.ofNullable(BiasedSampler.readDistribution(discreteDistrib
[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...
Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/958#discussion_r174186316 --- Diff: metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java --- @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load; + + +import com.google.common.base.Joiner; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.PosixParser; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.metron.common.utils.KafkaUtils; +import org.apache.metron.performance.load.monitor.AbstractMonitor; +import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor; +import org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor; +import org.apache.metron.performance.load.monitor.MonitorTask; +import org.apache.metron.performance.load.monitor.writers.CSVWriter; +import org.apache.metron.performance.load.monitor.writers.ConsoleWriter; +import org.apache.metron.performance.load.monitor.writers.Writable; +import org.apache.metron.performance.load.monitor.writers.Writer; +import org.apache.metron.performance.sampler.BiasedSampler; +import org.apache.metron.performance.sampler.Sampler; +import org.apache.metron.performance.sampler.UnbiasedSampler; + +import java.io.File; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +public class LoadGenerator +{ + public static String CONSUMER_GROUP = "load.group"; --- End diff -- done ---
[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...
Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/958#discussion_r174186275 --- Diff: metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java --- @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load; + + +import com.google.common.base.Joiner; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.PosixParser; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.metron.common.utils.KafkaUtils; +import org.apache.metron.performance.load.monitor.AbstractMonitor; +import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor; +import org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor; +import org.apache.metron.performance.load.monitor.MonitorTask; +import org.apache.metron.performance.load.monitor.writers.CSVWriter; +import org.apache.metron.performance.load.monitor.writers.ConsoleWriter; +import org.apache.metron.performance.load.monitor.writers.Writable; +import org.apache.metron.performance.load.monitor.writers.Writer; +import org.apache.metron.performance.sampler.BiasedSampler; +import org.apache.metron.performance.sampler.Sampler; +import org.apache.metron.performance.sampler.UnbiasedSampler; + +import java.io.File; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +public class LoadGenerator +{ + public static String CONSUMER_GROUP = "load.group"; + public static long SEND_PERIOD_MS = 100; + public static long MONITOR_PERIOD_MS = 1000*10; + private static ExecutorService pool; + private static ThreadLocal kafkaProducer; + public static AtomicLong numSent = new AtomicLong(0); + + public static void main( String[] args ) throws Exception { +CommandLine cli = LoadOptions.parse(new PosixParser(), args); +EnumMap> evaluatedArgs = LoadOptions.createConfig(cli); +Map kafkaConfig = new HashMap<>(); +kafkaConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); --- End diff -- done ---
[GitHub] metron issue #958: METRON-1483: Create a tool to monitor performance of the ...
Github user cestella commented on the issue: https://github.com/apache/metron/pull/958 @justinleet thanks for the review. I reacted to your comments either by fixing them or suggesting why I prefer what is there. I will add a new set of tests in the next commit. ---
[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...
Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/958#discussion_r174181759 --- Diff: metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java --- @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.performance.load; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +public class SendToKafka extends TimerTask { --- End diff -- well, I mean, I suppose it could be, but I didn't see value in it since all of our inputs currently are strings and the message generator exclusively generates strings, not other messages. It'd be a more serious refactoring to genericize this to make the message generator pluggable. Also, for utility code, I think I'd rather avoid prematurely generalizing it. Let me know what you think. ---
[GitHub] metron-bro-plugin-kafka issue #6: METRON-1469: Kafka Plugin for Bro - Config...
Github user nickwallen commented on the issue: https://github.com/apache/metron-bro-plugin-kafka/pull/6 +1 Looks great. Thanks @dcode ---
[GitHub] metron-bro-plugin-kafka issue #6: METRON-1469: Kafka Plugin for Bro - Config...
Github user JonZeolla commented on the issue: https://github.com/apache/metron-bro-plugin-kafka/pull/6 I'm +1 on this. @nickwallen anything outstanding on your end? ---