[GitHub] metron issue #942: METRON-1461: Modify the MIN, MAX Stellar methods to take ...

2018-03-13 Thread cestella
Github user cestella commented on the issue:

https://github.com/apache/metron/pull/942
  
So, I would suggest rather than accepting a list of stats objects, `MAX` 
and `MIN` accept one of the following:
* A StatisticsProvider object
* A list of comparables

Essentially, these two would both function:
* `MAX(STATS_ADD(null, 1, 2, 3))`
* `MAX([1, 2, 3])`


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186826
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java
 ---
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.sampler;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+public class BiasedSampler implements Sampler {
+  TreeMap> discreteDistribution;
+  public BiasedSampler(List>  
discreteDistribution, int max) {
+this.discreteDistribution = createDistribution(discreteDistribution, 
max);
+  }
+
+  public static List> readDistribution(File 
distrFile) throws IOException {
+List> ret = new ArrayList<>();
+System.out.println("Using biased sampler with the following biases:");
+try(BufferedReader br = new BufferedReader(new FileReader(distrFile))) 
{
+  int sumLeft = 0;
+  int sumRight = 0;
+  for(String line = null;(line = br.readLine()) != null;) {
+if(line.startsWith("#")) {
+  continue;
+}
+Iterable it = Splitter.on(",").split(line.trim());
+int left = Integer.parseInt(Iterables.getFirst(it, null));
+int right = Integer.parseInt(Iterables.getLast(it, null));
+System.out.println("\t" + left + "% of templates will comprise 
roughly " + right + "% of sample output");
+ret.add(new AbstractMap.SimpleEntry<>(left, right));
+sumLeft += left;
+sumRight += right;
+  }
+  if(sumLeft > 100 || sumRight > 100 ) {
+throw new IllegalStateException("Neither columns must sum to 
beyond 100.  " +
+"The first column is the % of templates. " +
+"The second column is the % of the sample that % of 
template occupies.");
+  }
+  else if(sumLeft < 100 && sumRight < 100) {
+int left = 100 - sumLeft;
+int right = 100 - sumRight;
+System.out.println("\t" + left + "% of templates will comprise 
roughly " + right + "% of sample output");
+ret.add(new AbstractMap.SimpleEntry<>(left, right));
+  }
+  return ret;
+}
+  }
+
+  private static TreeMap>
+ createDistribution(List>  
discreteDistribution, int max) {
+TreeMap> ret = new TreeMap<>();
+int from = 0;
+double weight = 0.0d;
+for(Map.Entry kv : discreteDistribution) {
+  double pctVals = kv.getKey()/100.0;
+  int to = from + (int)(max*pctVals);
+  double pctWeight = kv.getValue()/100.0;
+  ret.put(weight, new AbstractMap.SimpleEntry<>(from, to));
+  weight += pctWeight;
+  from = to;
+}
+return ret;
+  }
+
+  @Override
+  public int sample(Random rng, int limit) {
+double weight = rng.nextDouble();
+Map.Entry range = 
discreteDistribution.floorEntry(weight).getValue();
+return rng.nextInt(range.getValue() - range.getKey()) + range.getKey();
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186723
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java
 ---
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.sampler;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+public class BiasedSampler implements Sampler {
+  TreeMap> discreteDistribution;
+  public BiasedSampler(List>  
discreteDistribution, int max) {
+this.discreteDistribution = createDistribution(discreteDistribution, 
max);
+  }
+
+  public static List> readDistribution(File 
distrFile) throws IOException {
+List> ret = new ArrayList<>();
+System.out.println("Using biased sampler with the following biases:");
+try(BufferedReader br = new BufferedReader(new FileReader(distrFile))) 
{
+  int sumLeft = 0;
+  int sumRight = 0;
+  for(String line = null;(line = br.readLine()) != null;) {
+if(line.startsWith("#")) {
+  continue;
+}
+Iterable it = Splitter.on(",").split(line.trim());
+int left = Integer.parseInt(Iterables.getFirst(it, null));
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186700
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java
 ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+public class Writer {
+
+  private int summaryLookback;
+  private List> summaries = new ArrayList<>();
+  private List> writers;
+  private List monitors;
+
+  public Writer(List monitors, int summaryLookback, 
List> writers) {
+this.summaryLookback = summaryLookback;
+this.writers = writers;
+this.monitors = monitors;
+for(AbstractMonitor m : monitors) {
+  this.summaries.add(new LinkedList<>());
+}
+  }
+
+  public void writeAll() {
+int i = 0;
+Date dateOf = new Date();
+List results = new ArrayList<>();
+for(AbstractMonitor m : monitors) {
+  Long eps = m.get();
+  if(eps != null) {
+if (summaryLookback > 0) {
+  LinkedList summary = summaries.get(i);
+  addToLookback(eps == null ? Double.NaN : eps.doubleValue(), 
summary);
+  results.add(new Results(m.format(), m.name(), eps, 
Optional.of(getStats(summary;
+}
+else {
+  results.add(new Results(m.format(),m.name(), eps, 
Optional.empty()));
+}
+  }
+  else {
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186800
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/UnbiasedSampler.java
 ---
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.sampler;
+
+import java.util.Random;
+
+public class UnbiasedSampler implements Sampler {
+
+  @Override
+  public int sample(Random rng, int limit) {
+return rng.nextInt(limit);
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186641
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java
 ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+public class Writer {
+
+  private int summaryLookback;
+  private List> summaries = new ArrayList<>();
+  private List> writers;
+  private List monitors;
+
+  public Writer(List monitors, int summaryLookback, 
List> writers) {
+this.summaryLookback = summaryLookback;
+this.writers = writers;
+this.monitors = monitors;
+for(AbstractMonitor m : monitors) {
+  this.summaries.add(new LinkedList<>());
+}
+  }
+
+  public void writeAll() {
+int i = 0;
+Date dateOf = new Date();
+List results = new ArrayList<>();
+for(AbstractMonitor m : monitors) {
+  Long eps = m.get();
+  if(eps != null) {
+if (summaryLookback > 0) {
+  LinkedList summary = summaries.get(i);
+  addToLookback(eps == null ? Double.NaN : eps.doubleValue(), 
summary);
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186597
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/ConsoleWriter.java
 ---
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Consumer;
+
+public class ConsoleWriter implements Consumer {
+
+  private String getSummary(DescriptiveStatistics stats) {
+return String.format("Mean: %d, Std Dev: %d", (int)stats.getMean(), 
(int)Math.sqrt(stats.getVariance()));
+  }
+
+  @Override
+  public void accept(Writable writable) {
+List parts = new ArrayList<>();
+Date date = writable.getDate();
+for(Results r : writable.getResults()) {
+  Long eps = r.getEps();
+  if(eps != null) {
+String part = String.format(r.getFormat(), eps);
+if (r.getHistory().isPresent()) {
+  part += " (" + getSummary(r.getHistory().get()) + ")";
+}
+parts.add(part);
+  }
+}
+if(date != null) {
+  DateFormat dateFormat = new SimpleDateFormat("/MM/dd HH:mm:ss");
+  String header = dateFormat.format(date) + " - ";
+  String emptyHeader = "";
+  for (int i = 0; i < header.length(); ++i) {
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186529
  
--- Diff: 
metron-contrib/metron-performance/src/test/java/org/apache/metron/performance/load/SendToKafkaTest.java
 ---
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SendToKafkaTest {
+
+  @Test
+  public void testWritesCorrectNumber() throws InterruptedException {
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186458
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java
 ---
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.utils.cli.CLIOptions;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.cli.OptionHandler;
+
+import javax.annotation.Nullable;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+
+public enum LoadOptions implements CLIOptions {
+  HELP(new OptionHandler() {
+
+@Override
+public String getShortCode() {
+  return "h";
+}
+
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  return new Option(s, "help", false, "Generate Help screen");
+}
+  }),
+  ZK(new OptionHandler() {
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  Option o = new Option(s, "zk_quorum", true, "zookeeper quorum");
+  o.setArgName("QUORUM");
+  o.setRequired(false);
+  return o;
+}
+
+@Override
+public Optional getValue(LoadOptions option, CommandLine cli) {
+  if(option.has(cli)) {
+return Optional.ofNullable(option.get(cli));
+  }
+  else {
+return Optional.empty();
+  }
+}
+
+@Override
+public String getShortCode() {
+  return "z";
+}
+  }),
+  CONSUMER_GROUP(new OptionHandler() {
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  Option o = new Option(s, "consumer_group", true, "Consumer Group.  
The default is " + LoadGenerator.CONSUMER_GROUP);
+  o.setArgName("GROUP_ID");
+  o.setRequired(false);
+  return o;
+}
+
+@Override
+public Optional getValue(LoadOptions option, CommandLine cli) {
+  if(option.has(cli)) {
+return Optional.ofNullable(option.get(cli));
+  }
+  else {
+return Optional.of(LoadGenerator.CONSUMER_GROUP);
+  }
+}
+
+@Override
+public String getShortCode() {
+  return "cg";
+}
+  }),
+  BIASED_SAMPLE(new OptionHandler() {
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  Option o = new Option(s, "sample_bias", true, "The discrete 
distribution to bias the sampling. " +
+  "This is a CSV of 2 columns.  The first column is the % of 
the templates " +
+  "and the 2nd column is the probability (0-100) that it's 
chosen.  For instance:\n" +
+  "  20,80\n" +
+  "  80,20\n" +
+  "implies that 20% of the templates will comprise 80% of the 
output and the remaining 80% of the templates will comprise 20% of the 
output.");
+  o.setArgName("BIAS_FILE");
+  o.setRequired(false);
+  return o;
+}
+
+@Override
+public Optional getValue(LoadOptions option, CommandLine cli) {
+  if(!option.has(cli)) {
+return Optional.empty();
+  }
+  File discreteDistributionFile  = new File(option.get(cli));
+  if(discreteDistributionFile.exists()) {
+try {
+
+  return 
Optional.ofNullable(BiasedSampler.readDistribution(discreteDistrib

[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186359
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
 ---
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor;
+import 
org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor;
+import org.apache.metron.performance.load.monitor.MonitorTask;
+import org.apache.metron.performance.load.monitor.writers.CSVWriter;
+import org.apache.metron.performance.load.monitor.writers.ConsoleWriter;
+import org.apache.metron.performance.load.monitor.writers.Writable;
+import org.apache.metron.performance.load.monitor.writers.Writer;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.performance.sampler.Sampler;
+import org.apache.metron.performance.sampler.UnbiasedSampler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public class LoadGenerator
+{
+  public static String CONSUMER_GROUP = "load.group";
+  public static long SEND_PERIOD_MS = 100;
+  public static long MONITOR_PERIOD_MS = 1000*10;
+  private static ExecutorService pool;
+  private static ThreadLocal kafkaProducer;
+  public static AtomicLong numSent = new AtomicLong(0);
+
+  public static void main( String[] args ) throws Exception {
+CommandLine cli = LoadOptions.parse(new PosixParser(), args);
+EnumMap> evaluatedArgs = 
LoadOptions.createConfig(cli);
+Map kafkaConfig = new HashMap<>();
+kafkaConfig.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+kafkaConfig.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+kafkaConfig.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+kafkaConfig.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+if(LoadOptions.ZK.has(cli)) {
+  String zkQuorum = (String) evaluatedArgs.get(LoadOptions.ZK).get();
+  kafkaConfig.put("bootstrap.servers", 
Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum)));
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186403
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
 ---
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor;
+import 
org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor;
+import org.apache.metron.performance.load.monitor.MonitorTask;
+import org.apache.metron.performance.load.monitor.writers.CSVWriter;
+import org.apache.metron.performance.load.monitor.writers.ConsoleWriter;
+import org.apache.metron.performance.load.monitor.writers.Writable;
+import org.apache.metron.performance.load.monitor.writers.Writer;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.performance.sampler.Sampler;
+import org.apache.metron.performance.sampler.UnbiasedSampler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public class LoadGenerator
+{
+  public static String CONSUMER_GROUP = "load.group";
+  public static long SEND_PERIOD_MS = 100;
+  public static long MONITOR_PERIOD_MS = 1000*10;
+  private static ExecutorService pool;
+  private static ThreadLocal kafkaProducer;
+  public static AtomicLong numSent = new AtomicLong(0);
+
+  public static void main( String[] args ) throws Exception {
+CommandLine cli = LoadOptions.parse(new PosixParser(), args);
+EnumMap> evaluatedArgs = 
LoadOptions.createConfig(cli);
+Map kafkaConfig = new HashMap<>();
+kafkaConfig.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+kafkaConfig.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+kafkaConfig.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+kafkaConfig.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+if(LoadOptions.ZK.has(cli)) {
+  String zkQuorum = (String) evaluatedArgs.get(LoadOptions.ZK).get();
+  kafkaConfig.put("bootstrap.servers", 
Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum)));
+}
+String groupId = 
evaluatedArgs.get(LoadOptions.CONSUMER_GROUP).get().toString();
+System.out.println("Consumer Group: " + groupId);
+kafkaConfig.put("group.id", groupId);
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186490
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/MessageGenerator.java
 ---
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.metron.performance.sampler.Sampler;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class MessageGenerator implements Supplier {
+  private static ThreadLocal rng = ThreadLocal.withInitial(() -> 
new Random());
+  private static AtomicLong guidOffset = new AtomicLong(0);
+  private static String guidPrefix = "6141faf6-a8ba-4044-ab80-";
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186422
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java
 ---
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.utils.cli.CLIOptions;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.cli.OptionHandler;
+
+import javax.annotation.Nullable;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+
+public enum LoadOptions implements CLIOptions {
+  HELP(new OptionHandler() {
+
+@Override
+public String getShortCode() {
+  return "h";
+}
+
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  return new Option(s, "help", false, "Generate Help screen");
+}
+  }),
+  ZK(new OptionHandler() {
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  Option o = new Option(s, "zk_quorum", true, "zookeeper quorum");
+  o.setArgName("QUORUM");
+  o.setRequired(false);
+  return o;
+}
+
+@Override
+public Optional getValue(LoadOptions option, CommandLine cli) {
+  if(option.has(cli)) {
+return Optional.ofNullable(option.get(cli));
+  }
+  else {
+return Optional.empty();
+  }
+}
+
+@Override
+public String getShortCode() {
+  return "z";
+}
+  }),
+  CONSUMER_GROUP(new OptionHandler() {
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  Option o = new Option(s, "consumer_group", true, "Consumer Group.  
The default is " + LoadGenerator.CONSUMER_GROUP);
+  o.setArgName("GROUP_ID");
+  o.setRequired(false);
+  return o;
+}
+
+@Override
+public Optional getValue(LoadOptions option, CommandLine cli) {
+  if(option.has(cli)) {
+return Optional.ofNullable(option.get(cli));
+  }
+  else {
+return Optional.of(LoadGenerator.CONSUMER_GROUP);
+  }
+}
+
+@Override
+public String getShortCode() {
+  return "cg";
+}
+  }),
+  BIASED_SAMPLE(new OptionHandler() {
+@Nullable
+@Override
+public Option apply(@Nullable String s) {
+  Option o = new Option(s, "sample_bias", true, "The discrete 
distribution to bias the sampling. " +
+  "This is a CSV of 2 columns.  The first column is the % of 
the templates " +
+  "and the 2nd column is the probability (0-100) that it's 
chosen.  For instance:\n" +
+  "  20,80\n" +
+  "  80,20\n" +
+  "implies that 20% of the templates will comprise 80% of the 
output and the remaining 80% of the templates will comprise 20% of the 
output.");
+  o.setArgName("BIAS_FILE");
+  o.setRequired(false);
+  return o;
+}
+
+@Override
+public Optional getValue(LoadOptions option, CommandLine cli) {
+  if(!option.has(cli)) {
+return Optional.empty();
+  }
+  File discreteDistributionFile  = new File(option.get(cli));
+  if(discreteDistributionFile.exists()) {
+try {
+
+  return 
Optional.ofNullable(BiasedSampler.readDistribution(discreteDistrib

[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186316
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
 ---
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor;
+import 
org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor;
+import org.apache.metron.performance.load.monitor.MonitorTask;
+import org.apache.metron.performance.load.monitor.writers.CSVWriter;
+import org.apache.metron.performance.load.monitor.writers.ConsoleWriter;
+import org.apache.metron.performance.load.monitor.writers.Writable;
+import org.apache.metron.performance.load.monitor.writers.Writer;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.performance.sampler.Sampler;
+import org.apache.metron.performance.sampler.UnbiasedSampler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public class LoadGenerator
+{
+  public static String CONSUMER_GROUP = "load.group";
--- End diff --

done


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174186275
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
 ---
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor;
+import 
org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor;
+import org.apache.metron.performance.load.monitor.MonitorTask;
+import org.apache.metron.performance.load.monitor.writers.CSVWriter;
+import org.apache.metron.performance.load.monitor.writers.ConsoleWriter;
+import org.apache.metron.performance.load.monitor.writers.Writable;
+import org.apache.metron.performance.load.monitor.writers.Writer;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.performance.sampler.Sampler;
+import org.apache.metron.performance.sampler.UnbiasedSampler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public class LoadGenerator
+{
+  public static String CONSUMER_GROUP = "load.group";
+  public static long SEND_PERIOD_MS = 100;
+  public static long MONITOR_PERIOD_MS = 1000*10;
+  private static ExecutorService pool;
+  private static ThreadLocal kafkaProducer;
+  public static AtomicLong numSent = new AtomicLong(0);
+
+  public static void main( String[] args ) throws Exception {
+CommandLine cli = LoadOptions.parse(new PosixParser(), args);
+EnumMap> evaluatedArgs = 
LoadOptions.createConfig(cli);
+Map kafkaConfig = new HashMap<>();
+kafkaConfig.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
--- End diff --

done


---


[GitHub] metron issue #958: METRON-1483: Create a tool to monitor performance of the ...

2018-03-13 Thread cestella
Github user cestella commented on the issue:

https://github.com/apache/metron/pull/958
  
@justinleet thanks for the review.  I reacted to your comments either by 
fixing them or suggesting why I prefer what is there.  I will add a new set of 
tests in the next commit.


---


[GitHub] metron pull request #958: METRON-1483: Create a tool to monitor performance ...

2018-03-13 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/958#discussion_r174181759
  
--- Diff: 
metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java
 ---
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class SendToKafka extends TimerTask {
--- End diff --

well, I mean, I suppose it could be, but I didn't see value in it since all 
of our inputs currently are strings and the message generator exclusively 
generates strings, not other messages.  It'd be a more serious refactoring to 
genericize this to make the message generator pluggable.  Also, for utility 
code, I think I'd rather avoid prematurely generalizing it.  Let me know what 
you think.


---


[GitHub] metron-bro-plugin-kafka issue #6: METRON-1469: Kafka Plugin for Bro - Config...

2018-03-13 Thread nickwallen
Github user nickwallen commented on the issue:

https://github.com/apache/metron-bro-plugin-kafka/pull/6
  
+1 Looks great.  Thanks @dcode 


---


[GitHub] metron-bro-plugin-kafka issue #6: METRON-1469: Kafka Plugin for Bro - Config...

2018-03-13 Thread JonZeolla
Github user JonZeolla commented on the issue:

https://github.com/apache/metron-bro-plugin-kafka/pull/6
  
I'm +1 on this.  @nickwallen anything outstanding on your end?


---