Repository: metron
Updated Branches:
  refs/heads/master adb024070 -> 3f0b1b7b4


METRON-1350: Add reservoir sampling functions to Stellar closes 
apache/incubator-metron#867


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3f0b1b7b
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3f0b1b7b
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3f0b1b7b

Branch: refs/heads/master
Commit: 3f0b1b7b4a002d3f364bd2aee7b5921c0435c4a4
Parents: adb0240
Author: cstella <ceste...@gmail.com>
Authored: Wed Dec 20 09:30:03 2017 -0500
Committer: cstella <ceste...@gmail.com>
Committed: Wed Dec 20 09:30:03 2017 -0500

----------------------------------------------------------------------
 metron-analytics/metron-statistics/README.md    |  26 +++
 .../metron/statistics/sampling/Sampler.java     |  41 +++++
 .../metron/statistics/sampling/SamplerUtil.java |  39 ++++
 .../sampling/SamplingInitFunctions.java         |  84 +++++++++
 .../sampling/SamplingOpsFunctions.java          | 178 +++++++++++++++++++
 .../statistics/sampling/UniformSampler.java     |  95 ++++++++++
 .../sampling/SamplerFunctionsTest.java          | 130 ++++++++++++++
 .../statistics/sampling/UniformSamplerTest.java | 118 ++++++++++++
 metron-stellar/stellar-common/README.md         |   4 +
 9 files changed, 715 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-analytics/metron-statistics/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-statistics/README.md 
b/metron-analytics/metron-statistics/README.md
index 982132a..508b612 100644
--- a/metron-analytics/metron-statistics/README.md
+++ b/metron-analytics/metron-statistics/README.md
@@ -53,6 +53,32 @@ functions can be used from everywhere where Stellar is used.
   * bounds - A list of value bounds (excluding min and max) in sorted order.
 * Returns: Which bin N the value falls in such that bound(N-1) < value <= 
bound(N).  No min and max bounds are provided, so values smaller than the 0'th 
bound go in the 0'th bin, and values greater than the last bound go in the M'th 
bin.
 
+### Sampling Functions
+
+#### `SAMPLE_ADD`
+* Description: Add a value or collection of values to a sampler.
+* Input:
+  * sampler - Sampler to use.  If null, then a default Uniform sampler is 
created.
+  * o - The value to add.  If o is an Iterable, then each item is added.
+* Returns: The sampler.
+
+#### `SAMPLE_GET`
+* Description: Return the sample.
+* Input:
+  * sampler - Sampler to use.
+* Returns: The resulting sample.
+
+#### `SAMPLE_INIT`
+* Description: Create a [reservoir 
sampler](https://en.wikipedia.org/wiki/Reservoir_sampling) of a specific size 
or, if unspecified, size 1024.  Elements sampled by the reservoir sampler will 
be included in the final sample with equal probability.
+* Input:
+  * size? - The size of the reservoir sampler.  If unspecified, the size is 
1024.
+* Returns: The sampler object.
+
+#### `SAMPLE_MERGE`
+* Description: Merge and resample a collection of samples.
+* Input:
+  * samplers - A list of samplers to merge.
+* Returns: A sampler which represents the resampled merger of the samplers.
 
 ### Distributional Statistics
 

http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/Sampler.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/Sampler.java
 
b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/Sampler.java
new file mode 100644
index 0000000..93104fb
--- /dev/null
+++ 
b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/Sampler.java
@@ -0,0 +1,41 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.statistics.sampling;
+
+import java.util.function.Supplier;
+
+public interface Sampler extends Supplier<Iterable<Object>> {
+  int DEFAULT_SIZE=1024;
+  int getSize();
+  void add(Object o);
+
+  default void addAll(Iterable<? extends Object> vals) {
+    if(vals == null) {
+      return;
+    }
+    for(Object o : vals) {
+      add(o);
+    }
+  }
+
+  Sampler cloneEmpty();
+
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplerUtil.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplerUtil.java
 
b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplerUtil.java
new file mode 100644
index 0000000..ca2e86a
--- /dev/null
+++ 
b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplerUtil.java
@@ -0,0 +1,39 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.statistics.sampling;
+
+import com.google.common.collect.Iterables;
+
+import java.util.Optional;
+
+public enum SamplerUtil {
+  INSTANCE;
+
+  public Sampler merge(Iterable<Sampler> samplers, Optional<Sampler> 
baseSampler) {
+    if(Iterables.isEmpty(samplers)) {
+      return null;
+    }
+    Sampler ret = baseSampler.orElse(Iterables.getFirst(samplers, 
null).cloneEmpty());
+    for(Sampler s : samplers) {
+      ret.addAll(s.get());
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplingInitFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplingInitFunctions.java
 
b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplingInitFunctions.java
new file mode 100644
index 0000000..c57f374
--- /dev/null
+++ 
b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplingInitFunctions.java
@@ -0,0 +1,84 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.statistics.sampling;
+
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.Stellar;
+import org.apache.metron.stellar.dsl.StellarFunction;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+public class SamplingInitFunctions {
+
+  @Stellar(namespace="SAMPLE"
+          ,name="INIT"
+          ,description="Create a [reservoir 
sampler](https://en.wikipedia.org/wiki/Reservoir_sampling) of a specific size 
or, if unspecified, size " + Sampler.DEFAULT_SIZE + ".  Elements sampled by the 
reservoir sampler will be included in the final sample with equal probability."
+          ,params = {
+            "size? - The size of the reservoir sampler.  If unspecified, the 
size is " + Sampler.DEFAULT_SIZE
+          }
+          ,returns="The sampler object."
+  )
+
+  public static class UniformSamplerInit implements StellarFunction {
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      if(args.size() == 0) {
+        return new UniformSampler();
+      }
+      else {
+        Optional<Integer> sizeArg = get(args, 0, "Size", Integer.class);
+        if(sizeArg.isPresent() && sizeArg.get() <= 0) {
+          throw new IllegalStateException("Size must be a positive integer");
+        }
+        else {
+          return new UniformSampler(sizeArg.orElse(Sampler.DEFAULT_SIZE));
+        }
+      }
+    }
+
+    @Override
+    public void initialize(Context context) {
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+  }
+
+
+  public static <T> Optional<T> get(List<Object> args, int offset, String 
argName, Class<T> expectedClazz) {
+    Object obj = args.get(offset);
+    T ret = ConversionUtils.convert(obj, expectedClazz);
+    if(ret == null ) {
+      if(obj != null) {
+        throw new IllegalStateException(argName + "argument(" + obj
+                                       + " is expected to be an " + 
expectedClazz.getName()
+                                       + ", but was " + obj
+                                       );
+      }
+    }
+    return Optional.ofNullable(ret);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplingOpsFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplingOpsFunctions.java
 
b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplingOpsFunctions.java
new file mode 100644
index 0000000..4402fdf
--- /dev/null
+++ 
b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplingOpsFunctions.java
@@ -0,0 +1,178 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.statistics.sampling;
+
+import com.codahale.metrics.Reservoir;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.Stellar;
+import org.apache.metron.stellar.dsl.StellarFunction;
+
+import java.util.List;
+import java.util.Optional;
+
+public class SamplingOpsFunctions {
+
+  @Stellar(namespace="SAMPLE"
+          ,name="GET"
+          ,description="Return the sample."
+          ,params = {
+            "sampler - Sampler to use."
+          }
+          ,returns="The resulting sample."
+  )
+  public static class Get implements StellarFunction {
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      if(args.size() == 0) {
+        return null;
+      }
+
+      Sampler s = null;
+      Object sObj = args.get(0);
+      if(sObj == null) {
+        return null;
+      }
+      else if(sObj instanceof Sampler) {
+        s = (Sampler)sObj;
+      }
+      else {
+        throw new IllegalStateException("Expected a sampler, but found " + 
sObj);
+      }
+      return s.get();
+    }
+
+    @Override
+    public void initialize(Context context) {
+
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+  }
+
+  @Stellar(namespace="SAMPLE"
+          ,name="ADD"
+          ,description="Add a value or collection of values to a sampler."
+          ,params = {
+            "sampler - Sampler to use.  If null, then a default Uniform 
sampler is created."
+            ,"o - The value to add.  If o is an Iterable, then each item is 
added."
+          }
+          ,returns="The sampler."
+  )
+  public static class Add implements StellarFunction {
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      if(args.size() == 0) {
+        return null;
+      }
+      if(args.size() < 2) {
+        throw new IllegalStateException("Expected sampler and value to add");
+      }
+      Sampler s = null;
+      Object sObj = args.get(0);
+      if(sObj == null) {
+        s = new UniformSampler();
+      }
+      else if(sObj instanceof Sampler) {
+        s = (Sampler)sObj;
+      }
+      else {
+        throw new IllegalStateException("Expected a sampler, but found " + 
sObj);
+      }
+      Object valsObj = args.get(1);
+      if(valsObj == null) {
+        return s;
+      }
+      else if(valsObj instanceof Iterable) {
+        Iterable<Object> vals = (Iterable<Object>)valsObj;
+        s.addAll(vals);
+      }
+      else {
+        s.add(valsObj);
+      }
+      return s;
+    }
+
+    @Override
+    public void initialize(Context context) {
+
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+  }
+
+  @Stellar(namespace="SAMPLE"
+          ,name="MERGE"
+          ,description="Merge and resample a collection of samples."
+          ,params = {
+            "samplers - A list of samplers to merge."
+          , "baseSampler? - A base sampler to merge into.  If unspecified the 
first of the list of samplers will be cloned."
+          }
+          ,returns = "A sampler which represents the resampled merger of the 
samplers."
+  )
+  public static class Merge implements StellarFunction {
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      if(args.size() == 0) {
+        return null;
+      }
+      Object reservoirsObj = args.get(0);
+      if(reservoirsObj == null) {
+        return null;
+      }
+      if(!(reservoirsObj instanceof Iterable)){
+        throw new IllegalStateException("Expected a collection of Samplers");
+      }
+      Iterable<Sampler> reservoirs = (Iterable<Sampler>)reservoirsObj;
+
+      Sampler baseSampler = null;
+      if(args.size() > 1) {
+        Object baseSamplerObj = args.get(1);
+        if (baseSamplerObj != null) {
+          if (!(baseSamplerObj instanceof Sampler)) {
+            throw new IllegalStateException("Expected baseSampler to be a 
Sampler");
+          } else {
+            baseSampler = (Sampler) baseSamplerObj;
+          }
+        }
+      }
+      return SamplerUtil.INSTANCE.merge(reservoirs, 
Optional.ofNullable(baseSampler));
+    }
+
+    @Override
+    public void initialize(Context context) {
+
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java
 
b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java
new file mode 100644
index 0000000..11460e0
--- /dev/null
+++ 
b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.statistics.sampling;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * This is a reservoir sampler without replacement where each element sampled 
will be included
+ * with equal probability in the reservoir.
+ */
+public class UniformSampler implements Sampler {
+  private List<Object> reservoir;
+  private int seen = 0;
+  private int size;
+  private Random rng = new Random(0);
+
+  public UniformSampler() {
+    this(DEFAULT_SIZE);
+  }
+
+  public UniformSampler(int size) {
+    this.size = size;
+    reservoir = new ArrayList<>(size);
+  }
+
+  @Override
+  public Iterable<Object> get() {
+    return reservoir;
+  }
+
+  /**
+   * Add an object to the reservoir
+   * @param o
+   */
+  public void add(Object o) {
+    if(o == null) {
+      return;
+    }
+    if (reservoir.size() < size) {
+      reservoir.add(o);
+    } else {
+      int rIndex = rng.nextInt(seen + 1);
+      if (rIndex < size) {
+        reservoir.set(rIndex, o);
+      }
+    }
+    seen++;
+  }
+
+  @Override
+  public Sampler cloneEmpty() {
+    return new UniformSampler(getSize());
+  }
+
+  @Override
+  public int getSize() {
+    return size;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    UniformSampler that = (UniformSampler) o;
+
+    if (getSize() != that.getSize()) return false;
+    return reservoir != null ? reservoir.equals(that.reservoir) : 
that.reservoir == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = reservoir != null ? reservoir.hashCode() : 0;
+    result = 31 * result + getSize();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-analytics/metron-statistics/src/test/java/org/apache/metron/statistics/sampling/SamplerFunctionsTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-statistics/src/test/java/org/apache/metron/statistics/sampling/SamplerFunctionsTest.java
 
b/metron-analytics/metron-statistics/src/test/java/org/apache/metron/statistics/sampling/SamplerFunctionsTest.java
new file mode 100644
index 0000000..851ba67
--- /dev/null
+++ 
b/metron-analytics/metron-statistics/src/test/java/org/apache/metron/statistics/sampling/SamplerFunctionsTest.java
@@ -0,0 +1,130 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.statistics.sampling;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import org.apache.metron.stellar.common.utils.StellarProcessorUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+public class SamplerFunctionsTest {
+  static List<Double> sample = new ArrayList<>();
+  static List<String> sampleString = new ArrayList<>();
+  static List<Sampler> samplers = new ArrayList<>();
+  @BeforeClass
+  public static void beforeClass() {
+    Random rng = new Random(0);
+    int sampleSize = 1000000;
+    int numSubSamples = 10;
+    int subSampleSize = sampleSize/numSubSamples;
+    int currSample = -1;
+    for(int i = 0,j=0;i < sampleSize;++i,j = (j+1)%subSampleSize) {
+      double us= 10*rng.nextDouble();
+      sample.add(us);
+      sampleString.add(us + "");
+      if(j == 0) {
+        Sampler s = new UniformSampler(subSampleSize/10);
+        samplers.add(s);
+        currSample++;
+      }
+      samplers.get(currSample).add(us);
+    }
+  }
+
+  @Test
+  public void testValidInit_default() throws Exception {
+    String stmt = "SAMPLE_INIT()";
+    Sampler s = (Sampler) StellarProcessorUtils.run(stmt, new HashMap<>());
+    Assert.assertEquals(Sampler.DEFAULT_SIZE, s.getSize());
+  }
+
+  @Test
+  public void testValidInit_withSize() throws Exception {
+    String stmt = "SAMPLE_INIT(size)";
+    Sampler s = (Sampler) StellarProcessorUtils.run(stmt, 
ImmutableMap.of("size", 10 ));
+    Assert.assertEquals(10, s.getSize());
+  }
+
+  @Test(expected=IllegalStateException.class)
+  public void testInvalidInit(){
+    String stmt = "SAMPLE_INIT(size)";
+    Sampler s = (Sampler) StellarProcessorUtils.run(stmt, 
ImmutableMap.of("size", -10 ));
+  }
+
+  @Test
+  public void testGet() throws Exception {
+    String stmt = "SAMPLE_GET(SAMPLE_ADD(SAMPLE_INIT(size), values))";
+    Iterable<? extends Object> s = (Iterable<? extends Object>) 
StellarProcessorUtils.run(stmt, ImmutableMap.of("size", 10, "values", sample));
+    Assert.assertEquals(10, Iterables.size(s));
+    for(Object o : s) {
+      Assert.assertTrue(o instanceof Double);
+      Assert.assertTrue(sample.contains(o));
+    }
+  }
+
+  @Test
+  public void testAddSingle() throws Exception {
+    String stmt = "SAMPLE_ADD(SAMPLE_INIT(size), value)";
+    Sampler s = (Sampler) StellarProcessorUtils.run(stmt, 
ImmutableMap.of("size", 10, "value", "blah"));
+    Assert.assertEquals(10, s.getSize());
+    Assert.assertTrue(Iterables.getFirst(s.get(), null) instanceof String);
+  }
+
+  @Test
+  public void testAddAll() throws Exception {
+    String stmt = "SAMPLE_ADD(SAMPLE_INIT(size), value)";
+    Sampler s = (Sampler) StellarProcessorUtils.run(stmt, 
ImmutableMap.of("size", 10, "value", sampleString));
+    Assert.assertEquals(10, s.getSize());
+    for(Object o : s.get()) {
+      Assert.assertTrue(o instanceof String);
+      Assert.assertTrue(sampleString.contains(o));
+    }
+  }
+
+  @Test
+  public void testMerge() throws Exception {
+    Double sampleMean= null;
+    Double mergedSampleMean= null;
+    {
+      //grab the mean of the sample
+      String stmt = "STATS_MEAN(STATS_ADD(STATS_INIT(), 
SAMPLE_GET(SAMPLE_ADD(SAMPLE_INIT(size), values))))";
+      sampleMean = (Double) StellarProcessorUtils.run(stmt, 
ImmutableMap.of("size", sample.size()/10, "values", sample));
+    }
+    {
+      //grab the mean of the merged set of subsamples of the sample
+      String stmt = "STATS_MEAN(STATS_ADD(STATS_INIT(), 
SAMPLE_GET(SAMPLE_MERGE(samples))))";
+      mergedSampleMean = (Double) StellarProcessorUtils.run(stmt, 
ImmutableMap.of("samples", samplers));
+    }
+    Assert.assertEquals(sampleMean, mergedSampleMean, .1);
+    {
+      //Merge the sample with a simpler sampler
+      String stmt = "SAMPLE_MERGE(samples, SAMPLE_INIT(10))";
+      Sampler s = (Sampler) StellarProcessorUtils.run(stmt, 
ImmutableMap.of("samples", samplers));
+      Assert.assertEquals(10, s.getSize());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-analytics/metron-statistics/src/test/java/org/apache/metron/statistics/sampling/UniformSamplerTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-statistics/src/test/java/org/apache/metron/statistics/sampling/UniformSamplerTest.java
 
b/metron-analytics/metron-statistics/src/test/java/org/apache/metron/statistics/sampling/UniformSamplerTest.java
new file mode 100644
index 0000000..91ca3bd
--- /dev/null
+++ 
b/metron-analytics/metron-statistics/src/test/java/org/apache/metron/statistics/sampling/UniformSamplerTest.java
@@ -0,0 +1,118 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.statistics.sampling;
+
+import org.apache.commons.math3.random.GaussianRandomGenerator;
+import org.apache.commons.math3.random.MersenneTwister;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Random;
+
+public class UniformSamplerTest {
+  public static final int SAMPLE_SIZE = 2000000;
+  static DescriptiveStatistics uniformStats = new DescriptiveStatistics();
+  static List<Double> uniformSample = new ArrayList<>();
+  static DescriptiveStatistics gaussianStats = new DescriptiveStatistics();
+  static List<Double> gaussianSample = new ArrayList<>();
+
+  @BeforeClass
+  public static void beforeClass() {
+    Random rng = new Random(0);
+    GaussianRandomGenerator gen = new GaussianRandomGenerator(new 
MersenneTwister(0));
+    for(int i = 0;i < SAMPLE_SIZE;++i) {
+      double us= 10*rng.nextDouble();
+      uniformSample.add(us);
+      uniformStats.addValue(us);
+      double gs= 10*gen.nextNormalizedDouble();
+      gaussianSample.add(gs);
+      gaussianStats.addValue(gs);
+    }
+  }
+
+  @Test
+  public void testUniformDistributionIsPreserved() {
+    Sampler s = new UniformSampler(SAMPLE_SIZE/10);
+    s.addAll(uniformSample);
+    validateDistribution(s, uniformStats);
+  }
+
+  @Test
+  public void testGaussianDistributionIsPreserved() {
+    Sampler s = new UniformSampler(SAMPLE_SIZE/10);
+    s.addAll(gaussianSample);
+    validateDistribution(s, gaussianStats);
+  }
+
+  public void validateDistribution(Sampler sample, DescriptiveStatistics 
distribution) {
+    DescriptiveStatistics s = new DescriptiveStatistics();
+    for(Object d : sample.get()) {
+      s.addValue((Double)d);
+    }
+    Assert.assertEquals(s.getMean(), distribution.getMean(), .1);
+    Assert.assertEquals(s.getStandardDeviation(), 
distribution.getStandardDeviation(), .1);
+  }
+
+  @Test
+  public void testMergeUniform() {
+    Iterable<Sampler> subsamples = getSubsamples(uniformSample);
+    Sampler s = SamplerUtil.INSTANCE.merge(subsamples, Optional.empty());
+    validateDistribution(s, uniformStats);
+  }
+
+  @Test
+  public void testMerge() {
+    UniformSampler sampler = new UniformSampler(10);
+    Iterable<Sampler> subsamples = getSubsamples(uniformSample);
+    Sampler s = SamplerUtil.INSTANCE.merge(subsamples, Optional.of(sampler));
+    Assert.assertEquals(s.getSize(), 10);
+  }
+
+
+  @Test
+  public void testMergeGaussian() {
+    Iterable<Sampler> subsamples = getSubsamples(gaussianSample);
+    Sampler s = SamplerUtil.INSTANCE.merge(subsamples, Optional.empty());
+    validateDistribution(s, gaussianStats);
+  }
+
+  public Iterable<Sampler> getSubsamples(List<Double> sample) {
+    int numSamplers = 20;
+    int numSamplesPerSampler = SAMPLE_SIZE/numSamplers;
+    Sampler[] samplers = new Sampler[numSamplers];
+    int j = 0;
+    for(int i = 0;i < numSamplers;++i) {
+      samplers[i] = new UniformSampler(numSamplesPerSampler/10);
+      for(;j < (i+1)*numSamplesPerSampler && j < sample.size();++j) {
+        samplers[i].add(sample.get(j));
+      }
+    }
+    List<Sampler> ret = new ArrayList<>();
+    for(int i = 0;i < samplers.length;++i) {
+      ret.add(samplers[i]);
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-stellar/stellar-common/README.md
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/README.md 
b/metron-stellar/stellar-common/README.md
index e5b7dac..09bd4d6 100644
--- a/metron-stellar/stellar-common/README.md
+++ b/metron-stellar/stellar-common/README.md
@@ -214,6 +214,10 @@ Where:
 | [ `REGEXP_MATCH`](#regexp_match)                                             
                      |
 | [ `REGEXP_GROUP_VAL`](#regexp_group_val)                                     
                      |
 | [ `ROUND`](#round)                                                           
                      |
+| [ `SAMPLE_ADD`](../../metron-analytics/metron-statistics#sample_add)         
                      |
+| [ `SAMPLE_GET`](../../metron-analytics/metron-statistics#sample_get)         
                      |
+| [ `SAMPLE_INIT`](../../metron-analytics/metron-statistics#sample_init)       
                      |
+| [ `SAMPLE_MERGE`](../../metron-analytics/metron-statistics#sample_merge)     
                      |
 | [ `SET_ADD`](#set_add)                                                       
                      |
 | [ `SET_INIT`](#set_init)                                                     
                      |
 | [ `SET_MERGE`](#set_merge)                                                   
                      |

Reply via email to