METRON-1350: Add reservoir sampling functions to Stellar closes apache/incubator-metron#867
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3f0b1b7b Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3f0b1b7b Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3f0b1b7b Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual Commit: 3f0b1b7b4a002d3f364bd2aee7b5921c0435c4a4 Parents: adb0240 Author: cstella <ceste...@gmail.com> Authored: Wed Dec 20 09:30:03 2017 -0500 Committer: cstella <ceste...@gmail.com> Committed: Wed Dec 20 09:30:03 2017 -0500 ---------------------------------------------------------------------- metron-analytics/metron-statistics/README.md | 26 +++ .../metron/statistics/sampling/Sampler.java | 41 +++++ .../metron/statistics/sampling/SamplerUtil.java | 39 ++++ .../sampling/SamplingInitFunctions.java | 84 +++++++++ .../sampling/SamplingOpsFunctions.java | 178 +++++++++++++++++++ .../statistics/sampling/UniformSampler.java | 95 ++++++++++ .../sampling/SamplerFunctionsTest.java | 130 ++++++++++++++ .../statistics/sampling/UniformSamplerTest.java | 118 ++++++++++++ metron-stellar/stellar-common/README.md | 4 + 9 files changed, 715 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-analytics/metron-statistics/README.md ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-statistics/README.md b/metron-analytics/metron-statistics/README.md index 982132a..508b612 100644 --- a/metron-analytics/metron-statistics/README.md +++ b/metron-analytics/metron-statistics/README.md @@ -53,6 +53,32 @@ functions can be used from everywhere where Stellar is used. * bounds - A list of value bounds (excluding min and max) in sorted order. * Returns: Which bin N the value falls in such that bound(N-1) < value <= bound(N). No min and max bounds are provided, so values smaller than the 0'th bound go in the 0'th bin, and values greater than the last bound go in the M'th bin. +### Sampling Functions + +#### `SAMPLE_ADD` +* Description: Add a value or collection of values to a sampler. +* Input: + * sampler - Sampler to use. If null, then a default Uniform sampler is created. + * o - The value to add. If o is an Iterable, then each item is added. +* Returns: The sampler. + +#### `SAMPLE_GET` +* Description: Return the sample. +* Input: + * sampler - Sampler to use. +* Returns: The resulting sample. + +#### `SAMPLE_INIT` +* Description: Create a [reservoir sampler](https://en.wikipedia.org/wiki/Reservoir_sampling) of a specific size or, if unspecified, size 1024. Elements sampled by the reservoir sampler will be included in the final sample with equal probability. +* Input: + * size? - The size of the reservoir sampler. If unspecified, the size is 1024. +* Returns: The sampler object. + +#### `SAMPLE_MERGE` +* Description: Merge and resample a collection of samples. +* Input: + * samplers - A list of samplers to merge. +* Returns: A sampler which represents the resampled merger of the samplers. ### Distributional Statistics http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/Sampler.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/Sampler.java b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/Sampler.java new file mode 100644 index 0000000..93104fb --- /dev/null +++ b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/Sampler.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.statistics.sampling; + +import java.util.function.Supplier; + +public interface Sampler extends Supplier<Iterable<Object>> { + int DEFAULT_SIZE=1024; + int getSize(); + void add(Object o); + + default void addAll(Iterable<? extends Object> vals) { + if(vals == null) { + return; + } + for(Object o : vals) { + add(o); + } + } + + Sampler cloneEmpty(); + + +} http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplerUtil.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplerUtil.java b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplerUtil.java new file mode 100644 index 0000000..ca2e86a --- /dev/null +++ b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplerUtil.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.statistics.sampling; + +import com.google.common.collect.Iterables; + +import java.util.Optional; + +public enum SamplerUtil { + INSTANCE; + + public Sampler merge(Iterable<Sampler> samplers, Optional<Sampler> baseSampler) { + if(Iterables.isEmpty(samplers)) { + return null; + } + Sampler ret = baseSampler.orElse(Iterables.getFirst(samplers, null).cloneEmpty()); + for(Sampler s : samplers) { + ret.addAll(s.get()); + } + return ret; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplingInitFunctions.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplingInitFunctions.java b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplingInitFunctions.java new file mode 100644 index 0000000..c57f374 --- /dev/null +++ b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplingInitFunctions.java @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.statistics.sampling; + +import org.apache.metron.stellar.common.utils.ConversionUtils; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.ParseException; +import org.apache.metron.stellar.dsl.Stellar; +import org.apache.metron.stellar.dsl.StellarFunction; + +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; + +public class SamplingInitFunctions { + + @Stellar(namespace="SAMPLE" + ,name="INIT" + ,description="Create a [reservoir sampler](https://en.wikipedia.org/wiki/Reservoir_sampling) of a specific size or, if unspecified, size " + Sampler.DEFAULT_SIZE + ". Elements sampled by the reservoir sampler will be included in the final sample with equal probability." + ,params = { + "size? - The size of the reservoir sampler. If unspecified, the size is " + Sampler.DEFAULT_SIZE + } + ,returns="The sampler object." + ) + + public static class UniformSamplerInit implements StellarFunction { + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + if(args.size() == 0) { + return new UniformSampler(); + } + else { + Optional<Integer> sizeArg = get(args, 0, "Size", Integer.class); + if(sizeArg.isPresent() && sizeArg.get() <= 0) { + throw new IllegalStateException("Size must be a positive integer"); + } + else { + return new UniformSampler(sizeArg.orElse(Sampler.DEFAULT_SIZE)); + } + } + } + + @Override + public void initialize(Context context) { + } + + @Override + public boolean isInitialized() { + return true; + } + } + + + public static <T> Optional<T> get(List<Object> args, int offset, String argName, Class<T> expectedClazz) { + Object obj = args.get(offset); + T ret = ConversionUtils.convert(obj, expectedClazz); + if(ret == null ) { + if(obj != null) { + throw new IllegalStateException(argName + "argument(" + obj + + " is expected to be an " + expectedClazz.getName() + + ", but was " + obj + ); + } + } + return Optional.ofNullable(ret); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplingOpsFunctions.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplingOpsFunctions.java b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplingOpsFunctions.java new file mode 100644 index 0000000..4402fdf --- /dev/null +++ b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/SamplingOpsFunctions.java @@ -0,0 +1,178 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.statistics.sampling; + +import com.codahale.metrics.Reservoir; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.ParseException; +import org.apache.metron.stellar.dsl.Stellar; +import org.apache.metron.stellar.dsl.StellarFunction; + +import java.util.List; +import java.util.Optional; + +public class SamplingOpsFunctions { + + @Stellar(namespace="SAMPLE" + ,name="GET" + ,description="Return the sample." + ,params = { + "sampler - Sampler to use." + } + ,returns="The resulting sample." + ) + public static class Get implements StellarFunction { + + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + if(args.size() == 0) { + return null; + } + + Sampler s = null; + Object sObj = args.get(0); + if(sObj == null) { + return null; + } + else if(sObj instanceof Sampler) { + s = (Sampler)sObj; + } + else { + throw new IllegalStateException("Expected a sampler, but found " + sObj); + } + return s.get(); + } + + @Override + public void initialize(Context context) { + + } + + @Override + public boolean isInitialized() { + return true; + } + } + + @Stellar(namespace="SAMPLE" + ,name="ADD" + ,description="Add a value or collection of values to a sampler." + ,params = { + "sampler - Sampler to use. If null, then a default Uniform sampler is created." + ,"o - The value to add. If o is an Iterable, then each item is added." + } + ,returns="The sampler." + ) + public static class Add implements StellarFunction { + + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + if(args.size() == 0) { + return null; + } + if(args.size() < 2) { + throw new IllegalStateException("Expected sampler and value to add"); + } + Sampler s = null; + Object sObj = args.get(0); + if(sObj == null) { + s = new UniformSampler(); + } + else if(sObj instanceof Sampler) { + s = (Sampler)sObj; + } + else { + throw new IllegalStateException("Expected a sampler, but found " + sObj); + } + Object valsObj = args.get(1); + if(valsObj == null) { + return s; + } + else if(valsObj instanceof Iterable) { + Iterable<Object> vals = (Iterable<Object>)valsObj; + s.addAll(vals); + } + else { + s.add(valsObj); + } + return s; + } + + @Override + public void initialize(Context context) { + + } + + @Override + public boolean isInitialized() { + return true; + } + } + + @Stellar(namespace="SAMPLE" + ,name="MERGE" + ,description="Merge and resample a collection of samples." + ,params = { + "samplers - A list of samplers to merge." + , "baseSampler? - A base sampler to merge into. If unspecified the first of the list of samplers will be cloned." + } + ,returns = "A sampler which represents the resampled merger of the samplers." + ) + public static class Merge implements StellarFunction { + + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + if(args.size() == 0) { + return null; + } + Object reservoirsObj = args.get(0); + if(reservoirsObj == null) { + return null; + } + if(!(reservoirsObj instanceof Iterable)){ + throw new IllegalStateException("Expected a collection of Samplers"); + } + Iterable<Sampler> reservoirs = (Iterable<Sampler>)reservoirsObj; + + Sampler baseSampler = null; + if(args.size() > 1) { + Object baseSamplerObj = args.get(1); + if (baseSamplerObj != null) { + if (!(baseSamplerObj instanceof Sampler)) { + throw new IllegalStateException("Expected baseSampler to be a Sampler"); + } else { + baseSampler = (Sampler) baseSamplerObj; + } + } + } + return SamplerUtil.INSTANCE.merge(reservoirs, Optional.ofNullable(baseSampler)); + } + + @Override + public void initialize(Context context) { + + } + + @Override + public boolean isInitialized() { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java new file mode 100644 index 0000000..11460e0 --- /dev/null +++ b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.statistics.sampling; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +/** + * This is a reservoir sampler without replacement where each element sampled will be included + * with equal probability in the reservoir. + */ +public class UniformSampler implements Sampler { + private List<Object> reservoir; + private int seen = 0; + private int size; + private Random rng = new Random(0); + + public UniformSampler() { + this(DEFAULT_SIZE); + } + + public UniformSampler(int size) { + this.size = size; + reservoir = new ArrayList<>(size); + } + + @Override + public Iterable<Object> get() { + return reservoir; + } + + /** + * Add an object to the reservoir + * @param o + */ + public void add(Object o) { + if(o == null) { + return; + } + if (reservoir.size() < size) { + reservoir.add(o); + } else { + int rIndex = rng.nextInt(seen + 1); + if (rIndex < size) { + reservoir.set(rIndex, o); + } + } + seen++; + } + + @Override + public Sampler cloneEmpty() { + return new UniformSampler(getSize()); + } + + @Override + public int getSize() { + return size; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + UniformSampler that = (UniformSampler) o; + + if (getSize() != that.getSize()) return false; + return reservoir != null ? reservoir.equals(that.reservoir) : that.reservoir == null; + + } + + @Override + public int hashCode() { + int result = reservoir != null ? reservoir.hashCode() : 0; + result = 31 * result + getSize(); + return result; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-analytics/metron-statistics/src/test/java/org/apache/metron/statistics/sampling/SamplerFunctionsTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-statistics/src/test/java/org/apache/metron/statistics/sampling/SamplerFunctionsTest.java b/metron-analytics/metron-statistics/src/test/java/org/apache/metron/statistics/sampling/SamplerFunctionsTest.java new file mode 100644 index 0000000..851ba67 --- /dev/null +++ b/metron-analytics/metron-statistics/src/test/java/org/apache/metron/statistics/sampling/SamplerFunctionsTest.java @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.statistics.sampling; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import org.apache.metron.stellar.common.utils.StellarProcessorUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Random; + +public class SamplerFunctionsTest { + static List<Double> sample = new ArrayList<>(); + static List<String> sampleString = new ArrayList<>(); + static List<Sampler> samplers = new ArrayList<>(); + @BeforeClass + public static void beforeClass() { + Random rng = new Random(0); + int sampleSize = 1000000; + int numSubSamples = 10; + int subSampleSize = sampleSize/numSubSamples; + int currSample = -1; + for(int i = 0,j=0;i < sampleSize;++i,j = (j+1)%subSampleSize) { + double us= 10*rng.nextDouble(); + sample.add(us); + sampleString.add(us + ""); + if(j == 0) { + Sampler s = new UniformSampler(subSampleSize/10); + samplers.add(s); + currSample++; + } + samplers.get(currSample).add(us); + } + } + + @Test + public void testValidInit_default() throws Exception { + String stmt = "SAMPLE_INIT()"; + Sampler s = (Sampler) StellarProcessorUtils.run(stmt, new HashMap<>()); + Assert.assertEquals(Sampler.DEFAULT_SIZE, s.getSize()); + } + + @Test + public void testValidInit_withSize() throws Exception { + String stmt = "SAMPLE_INIT(size)"; + Sampler s = (Sampler) StellarProcessorUtils.run(stmt, ImmutableMap.of("size", 10 )); + Assert.assertEquals(10, s.getSize()); + } + + @Test(expected=IllegalStateException.class) + public void testInvalidInit(){ + String stmt = "SAMPLE_INIT(size)"; + Sampler s = (Sampler) StellarProcessorUtils.run(stmt, ImmutableMap.of("size", -10 )); + } + + @Test + public void testGet() throws Exception { + String stmt = "SAMPLE_GET(SAMPLE_ADD(SAMPLE_INIT(size), values))"; + Iterable<? extends Object> s = (Iterable<? extends Object>) StellarProcessorUtils.run(stmt, ImmutableMap.of("size", 10, "values", sample)); + Assert.assertEquals(10, Iterables.size(s)); + for(Object o : s) { + Assert.assertTrue(o instanceof Double); + Assert.assertTrue(sample.contains(o)); + } + } + + @Test + public void testAddSingle() throws Exception { + String stmt = "SAMPLE_ADD(SAMPLE_INIT(size), value)"; + Sampler s = (Sampler) StellarProcessorUtils.run(stmt, ImmutableMap.of("size", 10, "value", "blah")); + Assert.assertEquals(10, s.getSize()); + Assert.assertTrue(Iterables.getFirst(s.get(), null) instanceof String); + } + + @Test + public void testAddAll() throws Exception { + String stmt = "SAMPLE_ADD(SAMPLE_INIT(size), value)"; + Sampler s = (Sampler) StellarProcessorUtils.run(stmt, ImmutableMap.of("size", 10, "value", sampleString)); + Assert.assertEquals(10, s.getSize()); + for(Object o : s.get()) { + Assert.assertTrue(o instanceof String); + Assert.assertTrue(sampleString.contains(o)); + } + } + + @Test + public void testMerge() throws Exception { + Double sampleMean= null; + Double mergedSampleMean= null; + { + //grab the mean of the sample + String stmt = "STATS_MEAN(STATS_ADD(STATS_INIT(), SAMPLE_GET(SAMPLE_ADD(SAMPLE_INIT(size), values))))"; + sampleMean = (Double) StellarProcessorUtils.run(stmt, ImmutableMap.of("size", sample.size()/10, "values", sample)); + } + { + //grab the mean of the merged set of subsamples of the sample + String stmt = "STATS_MEAN(STATS_ADD(STATS_INIT(), SAMPLE_GET(SAMPLE_MERGE(samples))))"; + mergedSampleMean = (Double) StellarProcessorUtils.run(stmt, ImmutableMap.of("samples", samplers)); + } + Assert.assertEquals(sampleMean, mergedSampleMean, .1); + { + //Merge the sample with a simpler sampler + String stmt = "SAMPLE_MERGE(samples, SAMPLE_INIT(10))"; + Sampler s = (Sampler) StellarProcessorUtils.run(stmt, ImmutableMap.of("samples", samplers)); + Assert.assertEquals(10, s.getSize()); + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-analytics/metron-statistics/src/test/java/org/apache/metron/statistics/sampling/UniformSamplerTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-statistics/src/test/java/org/apache/metron/statistics/sampling/UniformSamplerTest.java b/metron-analytics/metron-statistics/src/test/java/org/apache/metron/statistics/sampling/UniformSamplerTest.java new file mode 100644 index 0000000..91ca3bd --- /dev/null +++ b/metron-analytics/metron-statistics/src/test/java/org/apache/metron/statistics/sampling/UniformSamplerTest.java @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.statistics.sampling; + +import org.apache.commons.math3.random.GaussianRandomGenerator; +import org.apache.commons.math3.random.MersenneTwister; +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Random; + +public class UniformSamplerTest { + public static final int SAMPLE_SIZE = 2000000; + static DescriptiveStatistics uniformStats = new DescriptiveStatistics(); + static List<Double> uniformSample = new ArrayList<>(); + static DescriptiveStatistics gaussianStats = new DescriptiveStatistics(); + static List<Double> gaussianSample = new ArrayList<>(); + + @BeforeClass + public static void beforeClass() { + Random rng = new Random(0); + GaussianRandomGenerator gen = new GaussianRandomGenerator(new MersenneTwister(0)); + for(int i = 0;i < SAMPLE_SIZE;++i) { + double us= 10*rng.nextDouble(); + uniformSample.add(us); + uniformStats.addValue(us); + double gs= 10*gen.nextNormalizedDouble(); + gaussianSample.add(gs); + gaussianStats.addValue(gs); + } + } + + @Test + public void testUniformDistributionIsPreserved() { + Sampler s = new UniformSampler(SAMPLE_SIZE/10); + s.addAll(uniformSample); + validateDistribution(s, uniformStats); + } + + @Test + public void testGaussianDistributionIsPreserved() { + Sampler s = new UniformSampler(SAMPLE_SIZE/10); + s.addAll(gaussianSample); + validateDistribution(s, gaussianStats); + } + + public void validateDistribution(Sampler sample, DescriptiveStatistics distribution) { + DescriptiveStatistics s = new DescriptiveStatistics(); + for(Object d : sample.get()) { + s.addValue((Double)d); + } + Assert.assertEquals(s.getMean(), distribution.getMean(), .1); + Assert.assertEquals(s.getStandardDeviation(), distribution.getStandardDeviation(), .1); + } + + @Test + public void testMergeUniform() { + Iterable<Sampler> subsamples = getSubsamples(uniformSample); + Sampler s = SamplerUtil.INSTANCE.merge(subsamples, Optional.empty()); + validateDistribution(s, uniformStats); + } + + @Test + public void testMerge() { + UniformSampler sampler = new UniformSampler(10); + Iterable<Sampler> subsamples = getSubsamples(uniformSample); + Sampler s = SamplerUtil.INSTANCE.merge(subsamples, Optional.of(sampler)); + Assert.assertEquals(s.getSize(), 10); + } + + + @Test + public void testMergeGaussian() { + Iterable<Sampler> subsamples = getSubsamples(gaussianSample); + Sampler s = SamplerUtil.INSTANCE.merge(subsamples, Optional.empty()); + validateDistribution(s, gaussianStats); + } + + public Iterable<Sampler> getSubsamples(List<Double> sample) { + int numSamplers = 20; + int numSamplesPerSampler = SAMPLE_SIZE/numSamplers; + Sampler[] samplers = new Sampler[numSamplers]; + int j = 0; + for(int i = 0;i < numSamplers;++i) { + samplers[i] = new UniformSampler(numSamplesPerSampler/10); + for(;j < (i+1)*numSamplesPerSampler && j < sample.size();++j) { + samplers[i].add(sample.get(j)); + } + } + List<Sampler> ret = new ArrayList<>(); + for(int i = 0;i < samplers.length;++i) { + ret.add(samplers[i]); + } + return ret; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3f0b1b7b/metron-stellar/stellar-common/README.md ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/README.md b/metron-stellar/stellar-common/README.md index e5b7dac..09bd4d6 100644 --- a/metron-stellar/stellar-common/README.md +++ b/metron-stellar/stellar-common/README.md @@ -214,6 +214,10 @@ Where: | [ `REGEXP_MATCH`](#regexp_match) | | [ `REGEXP_GROUP_VAL`](#regexp_group_val) | | [ `ROUND`](#round) | +| [ `SAMPLE_ADD`](../../metron-analytics/metron-statistics#sample_add) | +| [ `SAMPLE_GET`](../../metron-analytics/metron-statistics#sample_get) | +| [ `SAMPLE_INIT`](../../metron-analytics/metron-statistics#sample_init) | +| [ `SAMPLE_MERGE`](../../metron-analytics/metron-statistics#sample_merge) | | [ `SET_ADD`](#set_add) | | [ `SET_INIT`](#set_init) | | [ `SET_MERGE`](#set_merge) |