Repository: flink Updated Branches: refs/heads/master 673a883ed -> 8a43b9c0b
[FLINK-7186] Activate checkstyle flink-java/sampling This closes #4339. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72678f14 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72678f14 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72678f14 Branch: refs/heads/master Commit: 72678f1407fd24088edae3c96d33b90b43e21a0f Parents: 673a883 Author: Dawid Wysakowicz <dwysakow...@apache.org> Authored: Fri Jul 14 10:32:16 2017 +0200 Committer: zentol <ches...@apache.org> Committed: Tue Jul 18 16:13:51 2017 +0200 ---------------------------------------------------------------------- .../api/java/sampling/BernoulliSampler.java | 21 ++-- .../java/sampling/DistributedRandomSampler.java | 5 +- .../java/sampling/IntermediateSampleData.java | 1 + .../flink/api/java/sampling/PoissonSampler.java | 30 ++--- .../flink/api/java/sampling/RandomSampler.java | 11 +- .../ReservoirSamplerWithReplacement.java | 9 +- .../ReservoirSamplerWithoutReplacement.java | 15 +-- .../api/java/sampling/RandomSamplerTest.java | 112 +++++++++---------- 8 files changed, 106 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/72678f14/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java index c370f9d..f3242fa 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.sampling; import org.apache.flink.annotation.Internal; @@ -37,10 +38,10 @@ public class BernoulliSampler<T> extends RandomSampler<T> { private final double fraction; private final Random random; - + // THRESHOLD is a tuning parameter for choosing sampling method according to the fraction. - private final static double THRESHOLD = 0.33; - + private static final double THRESHOLD = 0.33; + /** * Create a Bernoulli sampler with sample fraction and default random number generator. * @@ -49,7 +50,7 @@ public class BernoulliSampler<T> extends RandomSampler<T> { public BernoulliSampler(double fraction) { this(fraction, new XORShiftRandom()); } - + /** * Create a Bernoulli sampler with sample fraction and random number generator seed. * @@ -59,7 +60,7 @@ public class BernoulliSampler<T> extends RandomSampler<T> { public BernoulliSampler(double fraction, long seed) { this(fraction, new XORShiftRandom(seed)); } - + /** * Create a Bernoulli sampler with sample fraction and random number generator. * @@ -71,7 +72,7 @@ public class BernoulliSampler<T> extends RandomSampler<T> { this.fraction = fraction; this.random = random; } - + /** * Sample the input elements, for each input element, take a Bernoulli trail for sampling. * @@ -81,12 +82,12 @@ public class BernoulliSampler<T> extends RandomSampler<T> { @Override public Iterator<T> sample(final Iterator<T> input) { if (fraction == 0) { - return EMPTY_ITERABLE; + return emptyIterable; } - + return new SampledIterator<T>() { T current = null; - + @Override public boolean hasNext() { if (current == null) { @@ -95,7 +96,7 @@ public class BernoulliSampler<T> extends RandomSampler<T> { return current != null; } - + @Override public T next() { if (current == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/72678f14/flink-java/src/main/java/org/apache/flink/api/java/sampling/DistributedRandomSampler.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/DistributedRandomSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/DistributedRandomSampler.java index c6ed14b..eaaea4e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/DistributedRandomSampler.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/DistributedRandomSampler.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.sampling; import org.apache.flink.annotation.Internal; @@ -41,7 +42,7 @@ public abstract class DistributedRandomSampler<T> extends RandomSampler<T> { this.numSamples = numSamples; } - protected final Iterator<IntermediateSampleData<T>> EMPTY_INTERMEDIATE_ITERABLE = + protected final Iterator<IntermediateSampleData<T>> emptyIntermediateIterable = new SampledIterator<IntermediateSampleData<T>>() { @Override public boolean hasNext() { @@ -71,7 +72,7 @@ public abstract class DistributedRandomSampler<T> extends RandomSampler<T> { */ public Iterator<T> sampleInCoordinator(Iterator<IntermediateSampleData<T>> input) { if (numSamples == 0) { - return EMPTY_ITERABLE; + return emptyIterable; } // This queue holds fixed number elements with the top K weight for the coordinator. http://git-wip-us.apache.org/repos/asf/flink/blob/72678f14/flink-java/src/main/java/org/apache/flink/api/java/sampling/IntermediateSampleData.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/IntermediateSampleData.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/IntermediateSampleData.java index 7fee021..3bd5ee9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/IntermediateSampleData.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/IntermediateSampleData.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.sampling; import org.apache.flink.annotation.Internal; http://git-wip-us.apache.org/repos/asf/flink/blob/72678f14/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java index 9b98288..3afbcce 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java @@ -15,13 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.sampling; -import org.apache.commons.math3.distribution.PoissonDistribution; import org.apache.flink.annotation.Internal; import org.apache.flink.util.Preconditions; import org.apache.flink.util.XORShiftRandom; +import org.apache.commons.math3.distribution.PoissonDistribution; + import java.util.Iterator; import java.util.Random; @@ -35,14 +37,14 @@ import java.util.Random; */ @Internal public class PoissonSampler<T> extends RandomSampler<T> { - + private PoissonDistribution poissonDistribution; private final double fraction; private final Random random; - + // THRESHOLD is a tuning parameter for choosing sampling method according to the fraction. - private final static double THRESHOLD = 0.4; - + private static final double THRESHOLD = 0.4; + /** * Create a poisson sampler which can sample elements with replacement. * @@ -58,7 +60,7 @@ public class PoissonSampler<T> extends RandomSampler<T> { } this.random = new XORShiftRandom(seed); } - + /** * Create a poisson sampler which can sample elements with replacement. * @@ -72,7 +74,7 @@ public class PoissonSampler<T> extends RandomSampler<T> { } this.random = new XORShiftRandom(); } - + /** * Sample the input elements, for each input element, generate its count following a poisson * distribution. @@ -83,13 +85,13 @@ public class PoissonSampler<T> extends RandomSampler<T> { @Override public Iterator<T> sample(final Iterator<T> input) { if (fraction == 0) { - return EMPTY_ITERABLE; + return emptyIterable; } - + return new SampledIterator<T>() { T currentElement; int currentCount = 0; - + @Override public boolean hasNext() { if (currentCount > 0) { @@ -103,7 +105,7 @@ public class PoissonSampler<T> extends RandomSampler<T> { } } } - + @Override public T next() { if (currentCount <= 0) { @@ -112,7 +114,7 @@ public class PoissonSampler<T> extends RandomSampler<T> { currentCount--; return currentElement; } - + public int poisson_ge1(double p) { // sample 'k' from Poisson(p), conditioned to k >= 1. double q = Math.pow(Math.E, -p); @@ -127,7 +129,7 @@ public class PoissonSampler<T> extends RandomSampler<T> { } return k; } - + private void skipGapElements(int num) { // skip the elements that occurrence number is zero. int elementCount = 0; @@ -136,7 +138,7 @@ public class PoissonSampler<T> extends RandomSampler<T> { elementCount++; } } - + private void samplingProcess() { if (fraction <= THRESHOLD) { double u = Math.max(random.nextDouble(), EPSILON); http://git-wip-us.apache.org/repos/asf/flink/blob/72678f14/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java index 7046b01..435d281 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.sampling; import org.apache.flink.annotation.Internal; @@ -30,20 +31,20 @@ import java.util.Iterator; @Internal public abstract class RandomSampler<T> { - protected final static double EPSILON = 1e-5; - - protected final Iterator<T> EMPTY_ITERABLE = new SampledIterator<T>() { + protected static final double EPSILON = 1e-5; + + protected final Iterator<T> emptyIterable = new SampledIterator<T>() { @Override public boolean hasNext() { return false; } - + @Override public T next() { return null; } }; - + /** * Randomly sample the elements from input in sequence, and return the result iterator. * http://git-wip-us.apache.org/repos/asf/flink/blob/72678f14/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java index b36d040..bcdd4f9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.sampling; import org.apache.flink.annotation.Internal; @@ -32,7 +33,7 @@ import java.util.Random; * difference is that, in the first phase, we generate weights for each element K times, so that * each element can get selected multiple times. * - * This implementation refers to the algorithm described in <a href="researcher.ibm.com/files/us-dpwoodru/tw11.pdf"> + * <p>This implementation refers to the algorithm described in <a href="researcher.ibm.com/files/us-dpwoodru/tw11.pdf"> * "Optimal Random Sampling from Distributed Streams Revisited"</a>. * * @param <T> The type of sample. @@ -50,7 +51,7 @@ public class ReservoirSamplerWithReplacement<T> extends DistributedRandomSampler public ReservoirSamplerWithReplacement(int numSamples) { this(numSamples, new XORShiftRandom()); } - + /** * Create a sampler with fixed sample size and random number generator seed. * @@ -60,7 +61,7 @@ public class ReservoirSamplerWithReplacement<T> extends DistributedRandomSampler public ReservoirSamplerWithReplacement(int numSamples, long seed) { this(numSamples, new XORShiftRandom(seed)); } - + /** * Create a sampler with fixed sample size and random number generator. * @@ -76,7 +77,7 @@ public class ReservoirSamplerWithReplacement<T> extends DistributedRandomSampler @Override public Iterator<IntermediateSampleData<T>> sampleInPartition(Iterator<T> input) { if (numSamples == 0) { - return EMPTY_INTERMEDIATE_ITERABLE; + return emptyIntermediateIterable; } // This queue holds a fixed number of elements with the top K weight for current partition. http://git-wip-us.apache.org/repos/asf/flink/blob/72678f14/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java index a38a921..212c339 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.sampling; import org.apache.flink.annotation.Internal; @@ -35,14 +36,14 @@ import java.util.Random; * select top K elements as the output of each partitions. In the second phase, we select top K * elements from all the outputs of the first phase. * - * This implementation refers to the algorithm described in <a href="researcher.ibm.com/files/us-dpwoodru/tw11.pdf"> + * <p>This implementation refers to the algorithm described in <a href="researcher.ibm.com/files/us-dpwoodru/tw11.pdf"> * "Optimal Random Sampling from Distributed Streams Revisited"</a>. * * @param <T> The type of the sampler. */ @Internal public class ReservoirSamplerWithoutReplacement<T> extends DistributedRandomSampler<T> { - + private final Random random; /** @@ -56,7 +57,7 @@ public class ReservoirSamplerWithoutReplacement<T> extends DistributedRandomSamp Preconditions.checkArgument(numSamples >= 0, "numSamples should be non-negative."); this.random = random; } - + /** * Create a new sampler with reservoir size and a default random number generator. * @@ -65,7 +66,7 @@ public class ReservoirSamplerWithoutReplacement<T> extends DistributedRandomSamp public ReservoirSamplerWithoutReplacement(int numSamples) { this(numSamples, new XORShiftRandom()); } - + /** * Create a new sampler with reservoir size and the seed for random number generator. * @@ -73,14 +74,14 @@ public class ReservoirSamplerWithoutReplacement<T> extends DistributedRandomSamp * @param seed Random number generator seed. */ public ReservoirSamplerWithoutReplacement(int numSamples, long seed) { - + this(numSamples, new XORShiftRandom(seed)); } - + @Override public Iterator<IntermediateSampleData<T>> sampleInPartition(Iterator<T> input) { if (numSamples == 0) { - return EMPTY_INTERMEDIATE_ITERABLE; + return emptyIntermediateIterable; } // This queue holds fixed number elements with the top K weight for current partition. http://git-wip-us.apache.org/repos/asf/flink/blob/72678f14/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java index 228dd3a..db1d19f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java @@ -15,14 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.sampling; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest; import org.apache.flink.testutils.junit.RetryOnFailure; import org.apache.flink.testutils.junit.RetryRule; import org.apache.flink.util.Preconditions; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -61,14 +63,12 @@ public class RandomSamplerTest { private static final List<Double> source = new ArrayList<Double>(SOURCE_SIZE); - @Rule public final RetryRule retryRule = new RetryRule(); @SuppressWarnings({"unchecked", "rawtypes"}) private final List<Double>[] sourcePartitions = new List[DEFAULT_PARTITION_NUMBER]; - @BeforeClass public static void init() { // initiate source data set. @@ -78,27 +78,27 @@ public class RandomSamplerTest { } private void initSourcePartition() { - for (int i = 0; i< DEFAULT_PARTITION_NUMBER; i++) { - sourcePartitions[i] = new ArrayList<Double>((int)Math.ceil((double)SOURCE_SIZE / DEFAULT_PARTITION_NUMBER)); + for (int i = 0; i < DEFAULT_PARTITION_NUMBER; i++) { + sourcePartitions[i] = new ArrayList<Double>((int) Math.ceil((double) SOURCE_SIZE / DEFAULT_PARTITION_NUMBER)); } - for (int i = 0; i< SOURCE_SIZE; i++) { + for (int i = 0; i < SOURCE_SIZE; i++) { int index = i % DEFAULT_PARTITION_NUMBER; - sourcePartitions[index].add((double)i); + sourcePartitions[index].add((double) i); } } - + @Test(expected = java.lang.IllegalArgumentException.class) public void testBernoulliSamplerWithUnexpectedFraction1() { verifySamplerFraction(-1, false); } - + @Test(expected = java.lang.IllegalArgumentException.class) public void testBernoulliSamplerWithUnexpectedFraction2() { verifySamplerFraction(2, false); } - + @Test - @RetryOnFailure(times=3) + @RetryOnFailure(times = 3) public void testBernoulliSamplerFraction() { verifySamplerFraction(0.01, false); verifySamplerFraction(0.05, false); @@ -108,22 +108,22 @@ public class RandomSamplerTest { verifySamplerFraction(0.854, false); verifySamplerFraction(0.99, false); } - + @Test - @RetryOnFailure(times=3) + @RetryOnFailure(times = 3) public void testBernoulliSamplerDuplicateElements() { verifyRandomSamplerDuplicateElements(new BernoulliSampler<Double>(0.01)); verifyRandomSamplerDuplicateElements(new BernoulliSampler<Double>(0.1)); verifyRandomSamplerDuplicateElements(new BernoulliSampler<Double>(0.5)); } - + @Test(expected = java.lang.IllegalArgumentException.class) public void testPoissonSamplerWithUnexpectedFraction1() { verifySamplerFraction(-1, true); } - + @Test - @RetryOnFailure(times=3) + @RetryOnFailure(times = 3) public void testPoissonSamplerFraction() { verifySamplerFraction(0.01, true); verifySamplerFraction(0.05, true); @@ -133,37 +133,37 @@ public class RandomSamplerTest { verifySamplerFraction(0.99, true); verifySamplerFraction(1.5, true); } - + @Test(expected = java.lang.IllegalArgumentException.class) public void testReservoirSamplerUnexpectedSize1() { verifySamplerFixedSampleSize(-1, true); } - + @Test(expected = java.lang.IllegalArgumentException.class) public void testReservoirSamplerUnexpectedSize2() { verifySamplerFixedSampleSize(-1, false); } - + @Test - @RetryOnFailure(times=3) + @RetryOnFailure(times = 3) public void testBernoulliSamplerDistribution() { verifyBernoulliSampler(0.01d); verifyBernoulliSampler(0.05d); verifyBernoulliSampler(0.1d); verifyBernoulliSampler(0.5d); } - + @Test - @RetryOnFailure(times=3) + @RetryOnFailure(times = 3) public void testPoissonSamplerDistribution() { verifyPoissonSampler(0.01d); verifyPoissonSampler(0.05d); verifyPoissonSampler(0.1d); verifyPoissonSampler(0.5d); } - + @Test - @RetryOnFailure(times=3) + @RetryOnFailure(times = 3) public void testReservoirSamplerSampledSize() { verifySamplerFixedSampleSize(1, true); verifySamplerFixedSampleSize(10, true); @@ -171,41 +171,41 @@ public class RandomSamplerTest { verifySamplerFixedSampleSize(1234, true); verifySamplerFixedSampleSize(9999, true); verifySamplerFixedSampleSize(20000, true); - + verifySamplerFixedSampleSize(1, false); verifySamplerFixedSampleSize(10, false); verifySamplerFixedSampleSize(100, false); verifySamplerFixedSampleSize(1234, false); verifySamplerFixedSampleSize(9999, false); } - + @Test - @RetryOnFailure(times=3) + @RetryOnFailure(times = 3) public void testReservoirSamplerSampledSize2() { RandomSampler<Double> sampler = new ReservoirSamplerWithoutReplacement<Double>(20000); Iterator<Double> sampled = sampler.sample(source.iterator()); assertTrue("ReservoirSamplerWithoutReplacement sampled output size should not beyond the source size.", getSize(sampled) == SOURCE_SIZE); } - + @Test - @RetryOnFailure(times=3) + @RetryOnFailure(times = 3) public void testReservoirSamplerDuplicateElements() { verifyRandomSamplerDuplicateElements(new ReservoirSamplerWithoutReplacement<Double>(100)); verifyRandomSamplerDuplicateElements(new ReservoirSamplerWithoutReplacement<Double>(1000)); verifyRandomSamplerDuplicateElements(new ReservoirSamplerWithoutReplacement<Double>(5000)); } - + @Test - @RetryOnFailure(times=3) + @RetryOnFailure(times = 3) public void testReservoirSamplerWithoutReplacement() { verifyReservoirSamplerWithoutReplacement(100, false); verifyReservoirSamplerWithoutReplacement(500, false); verifyReservoirSamplerWithoutReplacement(1000, false); verifyReservoirSamplerWithoutReplacement(5000, false); } - + @Test - @RetryOnFailure(times=3) + @RetryOnFailure(times = 3) public void testReservoirSamplerWithReplacement() { verifyReservoirSamplerWithReplacement(100, false); verifyReservoirSamplerWithReplacement(500, false); @@ -214,7 +214,7 @@ public class RandomSamplerTest { } @Test - @RetryOnFailure(times=3) + @RetryOnFailure(times = 3) public void testReservoirSamplerWithMultiSourcePartitions1() { initSourcePartition(); @@ -225,7 +225,7 @@ public class RandomSamplerTest { } @Test - @RetryOnFailure(times=3) + @RetryOnFailure(times = 3) public void testReservoirSamplerWithMultiSourcePartitions2() { initSourcePartition(); @@ -259,7 +259,7 @@ public class RandomSamplerTest { } else { sampler = new BernoulliSampler<Double>(fraction); } - + // take 20 times sample, and take the average result size for next step comparison. int totalSampledSize = 0; double sampleCount = 20; @@ -283,7 +283,7 @@ public class RandomSamplerTest { Set<Double> set = Sets.newHashSet(list); assertTrue("There should not have duplicate element for sampler without replacement.", list.size() == set.size()); } - + private int getSize(Iterator<?> iterator) { int size = 0; while (iterator.hasNext()) { @@ -292,25 +292,25 @@ public class RandomSamplerTest { } return size; } - + private void verifyBernoulliSampler(double fraction) { BernoulliSampler<Double> sampler = new BernoulliSampler<Double>(fraction); verifyRandomSamplerWithFraction(fraction, sampler, true); verifyRandomSamplerWithFraction(fraction, sampler, false); } - + private void verifyPoissonSampler(double fraction) { PoissonSampler<Double> sampler = new PoissonSampler<Double>(fraction); verifyRandomSamplerWithFraction(fraction, sampler, true); verifyRandomSamplerWithFraction(fraction, sampler, false); } - + private void verifyReservoirSamplerWithReplacement(int numSamplers, boolean sampleOnPartitions) { ReservoirSamplerWithReplacement<Double> sampler = new ReservoirSamplerWithReplacement<Double>(numSamplers); verifyRandomSamplerWithSampleSize(numSamplers, sampler, true, sampleOnPartitions); verifyRandomSamplerWithSampleSize(numSamplers, sampler, false, sampleOnPartitions); } - + private void verifyReservoirSamplerWithoutReplacement(int numSamplers, boolean sampleOnPartitions) { ReservoirSamplerWithoutReplacement<Double> sampler = new ReservoirSamplerWithoutReplacement<Double>(numSamplers); verifyRandomSamplerWithSampleSize(numSamplers, sampler, true, sampleOnPartitions); @@ -330,7 +330,7 @@ public class RandomSamplerTest { } else { baseSample = getWrongSampler(fraction); } - + verifyKSTest(sampler, baseSample, withDefaultSampler); } @@ -347,7 +347,7 @@ public class RandomSamplerTest { } else { baseSample = getWrongSampler(sampleSize); } - + verifyKSTest(sampler, baseSample, withDefaultSampler, sampleWithPartitions); } @@ -365,13 +365,13 @@ public class RandomSamplerTest { assertTrue(String.format("KS test result with p value(%f), d value(%f)", pValue, dValue), pValue > dValue); } } - + private double[] getSampledOutput(RandomSampler<Double> sampler, boolean sampleOnPartitions) { Iterator<Double> sampled; if (sampleOnPartitions) { - DistributedRandomSampler<Double> reservoirRandomSampler = (DistributedRandomSampler<Double>)sampler; + DistributedRandomSampler<Double> reservoirRandomSampler = (DistributedRandomSampler<Double>) sampler; List<IntermediateSampleData<Double>> intermediateResult = Lists.newLinkedList(); - for (int i = 0; i< DEFAULT_PARTITION_NUMBER; i++) { + for (int i = 0; i < DEFAULT_PARTITION_NUMBER; i++) { Iterator<IntermediateSampleData<Double>> partialIntermediateResult = reservoirRandomSampler.sampleInPartition(sourcePartitions[i].iterator()); while (partialIntermediateResult.hasNext()) { intermediateResult.add(partialIntermediateResult.next()); @@ -408,10 +408,10 @@ public class RandomSamplerTest { for (int i = 0; i < size; i++) { defaultSampler[i] = Math.round(step * i); } - + return defaultSampler; } - + private double[] getDefaultSampler(int fixSize) { Preconditions.checkArgument(fixSize > 0, "Sample fraction should be positive."); double step = SOURCE_SIZE / (double) fixSize; @@ -419,10 +419,10 @@ public class RandomSamplerTest { for (int i = 0; i < fixSize; i++) { defaultSampler[i] = Math.round(step * i); } - + return defaultSampler; } - + /* * Build a failed sample distribution which only contains elements in the first half of source data. */ @@ -434,10 +434,10 @@ public class RandomSamplerTest { for (int i = 0; i < size; i++) { wrongSampler[i] = (double) i % halfSourceSize; } - + return wrongSampler; } - + /* * Build a failed sample distribution which only contains elements in the first half of source data. */ @@ -448,10 +448,10 @@ public class RandomSamplerTest { for (int i = 0; i < fixSize; i++) { wrongSampler[i] = (double) i % halfSourceSize; } - + return wrongSampler; } - + /* * Calculate the D value of K-S test for p-value 0.001, m and n are the sample size */