Repository: flink
Updated Branches:
  refs/heads/master 673a883ed -> 8a43b9c0b


[FLINK-7186] Activate checkstyle flink-java/sampling

This closes #4339.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72678f14
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72678f14
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72678f14

Branch: refs/heads/master
Commit: 72678f1407fd24088edae3c96d33b90b43e21a0f
Parents: 673a883
Author: Dawid Wysakowicz <dwysakow...@apache.org>
Authored: Fri Jul 14 10:32:16 2017 +0200
Committer: zentol <ches...@apache.org>
Committed: Tue Jul 18 16:13:51 2017 +0200

----------------------------------------------------------------------
 .../api/java/sampling/BernoulliSampler.java     |  21 ++--
 .../java/sampling/DistributedRandomSampler.java |   5 +-
 .../java/sampling/IntermediateSampleData.java   |   1 +
 .../flink/api/java/sampling/PoissonSampler.java |  30 ++---
 .../flink/api/java/sampling/RandomSampler.java  |  11 +-
 .../ReservoirSamplerWithReplacement.java        |   9 +-
 .../ReservoirSamplerWithoutReplacement.java     |  15 +--
 .../api/java/sampling/RandomSamplerTest.java    | 112 +++++++++----------
 8 files changed, 106 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/72678f14/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
index c370f9d..f3242fa 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.sampling;
 
 import org.apache.flink.annotation.Internal;
@@ -37,10 +38,10 @@ public class BernoulliSampler<T> extends RandomSampler<T> {
 
        private final double fraction;
        private final Random random;
-       
+
        // THRESHOLD is a tuning parameter for choosing sampling method 
according to the fraction.
-       private final static double THRESHOLD = 0.33;
-       
+       private static final double THRESHOLD = 0.33;
+
        /**
         * Create a Bernoulli sampler with sample fraction and default random 
number generator.
         *
@@ -49,7 +50,7 @@ public class BernoulliSampler<T> extends RandomSampler<T> {
        public BernoulliSampler(double fraction) {
                this(fraction, new XORShiftRandom());
        }
-       
+
        /**
         * Create a Bernoulli sampler with sample fraction and random number 
generator seed.
         *
@@ -59,7 +60,7 @@ public class BernoulliSampler<T> extends RandomSampler<T> {
        public BernoulliSampler(double fraction, long seed) {
                this(fraction, new XORShiftRandom(seed));
        }
-       
+
        /**
         * Create a Bernoulli sampler with sample fraction and random number 
generator.
         *
@@ -71,7 +72,7 @@ public class BernoulliSampler<T> extends RandomSampler<T> {
                this.fraction = fraction;
                this.random = random;
        }
-       
+
        /**
         * Sample the input elements, for each input element, take a Bernoulli 
trail for sampling.
         *
@@ -81,12 +82,12 @@ public class BernoulliSampler<T> extends RandomSampler<T> {
        @Override
        public Iterator<T> sample(final Iterator<T> input) {
                if (fraction == 0) {
-                       return EMPTY_ITERABLE;
+                       return emptyIterable;
                }
-               
+
                return new SampledIterator<T>() {
                        T current = null;
-                       
+
                        @Override
                        public boolean hasNext() {
                                if (current == null) {
@@ -95,7 +96,7 @@ public class BernoulliSampler<T> extends RandomSampler<T> {
 
                                return current != null;
                        }
-                       
+
                        @Override
                        public T next() {
                                if (current == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/72678f14/flink-java/src/main/java/org/apache/flink/api/java/sampling/DistributedRandomSampler.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/sampling/DistributedRandomSampler.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/DistributedRandomSampler.java
index c6ed14b..eaaea4e 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/sampling/DistributedRandomSampler.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/DistributedRandomSampler.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.sampling;
 
 import org.apache.flink.annotation.Internal;
@@ -41,7 +42,7 @@ public abstract class DistributedRandomSampler<T> extends 
RandomSampler<T> {
                this.numSamples = numSamples;
        }
 
-       protected final Iterator<IntermediateSampleData<T>> 
EMPTY_INTERMEDIATE_ITERABLE =
+       protected final Iterator<IntermediateSampleData<T>> 
emptyIntermediateIterable =
                new SampledIterator<IntermediateSampleData<T>>() {
                        @Override
                        public boolean hasNext() {
@@ -71,7 +72,7 @@ public abstract class DistributedRandomSampler<T> extends 
RandomSampler<T> {
         */
        public Iterator<T> 
sampleInCoordinator(Iterator<IntermediateSampleData<T>> input) {
                if (numSamples == 0) {
-                       return EMPTY_ITERABLE;
+                       return emptyIterable;
                }
 
                // This queue holds fixed number elements with the top K weight 
for the coordinator.

http://git-wip-us.apache.org/repos/asf/flink/blob/72678f14/flink-java/src/main/java/org/apache/flink/api/java/sampling/IntermediateSampleData.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/sampling/IntermediateSampleData.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/IntermediateSampleData.java
index 7fee021..3bd5ee9 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/sampling/IntermediateSampleData.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/IntermediateSampleData.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.sampling;
 
 import org.apache.flink.annotation.Internal;

http://git-wip-us.apache.org/repos/asf/flink/blob/72678f14/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
index 9b98288..3afbcce 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
@@ -15,13 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.sampling;
 
-import org.apache.commons.math3.distribution.PoissonDistribution;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.XORShiftRandom;
 
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
 import java.util.Iterator;
 import java.util.Random;
 
@@ -35,14 +37,14 @@ import java.util.Random;
  */
 @Internal
 public class PoissonSampler<T> extends RandomSampler<T> {
-       
+
        private PoissonDistribution poissonDistribution;
        private final double fraction;
        private final Random random;
-       
+
        // THRESHOLD is a tuning parameter for choosing sampling method 
according to the fraction.
-       private final static double THRESHOLD = 0.4;
-       
+       private static final double THRESHOLD = 0.4;
+
        /**
         * Create a poisson sampler which can sample elements with replacement.
         *
@@ -58,7 +60,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
                }
                this.random = new XORShiftRandom(seed);
        }
-       
+
        /**
         * Create a poisson sampler which can sample elements with replacement.
         *
@@ -72,7 +74,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
                }
                this.random = new XORShiftRandom();
        }
-       
+
        /**
         * Sample the input elements, for each input element, generate its 
count following a poisson
         * distribution.
@@ -83,13 +85,13 @@ public class PoissonSampler<T> extends RandomSampler<T> {
        @Override
        public Iterator<T> sample(final Iterator<T> input) {
                if (fraction == 0) {
-                       return EMPTY_ITERABLE;
+                       return emptyIterable;
                }
-               
+
                return new SampledIterator<T>() {
                        T currentElement;
                        int currentCount = 0;
-                       
+
                        @Override
                        public boolean hasNext() {
                                if (currentCount > 0) {
@@ -103,7 +105,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
                                        }
                                }
                        }
-                       
+
                        @Override
                        public T next() {
                                if (currentCount <= 0) {
@@ -112,7 +114,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
                                currentCount--;
                                return currentElement;
                        }
-                       
+
                        public int poisson_ge1(double p) {
                                // sample 'k' from Poisson(p), conditioned to k 
>= 1.
                                double q = Math.pow(Math.E, -p);
@@ -127,7 +129,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
                                }
                                return k;
                        }
-                       
+
                        private void skipGapElements(int num) {
                                // skip the elements that occurrence number is 
zero.
                                int elementCount = 0;
@@ -136,7 +138,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
                                        elementCount++;
                                }
                        }
-                       
+
                        private void samplingProcess() {
                                if (fraction <= THRESHOLD) {
                                        double u = 
Math.max(random.nextDouble(), EPSILON);

http://git-wip-us.apache.org/repos/asf/flink/blob/72678f14/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
index 7046b01..435d281 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.sampling;
 
 import org.apache.flink.annotation.Internal;
@@ -30,20 +31,20 @@ import java.util.Iterator;
 @Internal
 public abstract class RandomSampler<T> {
 
-       protected final static double EPSILON = 1e-5;
-       
-       protected final Iterator<T> EMPTY_ITERABLE = new SampledIterator<T>() {
+       protected static final double EPSILON = 1e-5;
+
+       protected final Iterator<T> emptyIterable = new SampledIterator<T>() {
                @Override
                public boolean hasNext() {
                        return false;
                }
-               
+
                @Override
                public T next() {
                        return null;
                }
        };
-       
+
        /**
         * Randomly sample the elements from input in sequence, and return the 
result iterator.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/72678f14/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
index b36d040..bcdd4f9 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.sampling;
 
 import org.apache.flink.annotation.Internal;
@@ -32,7 +33,7 @@ import java.util.Random;
  * difference is that, in the first phase, we generate weights for each 
element K times, so that
  * each element can get selected multiple times.
  *
- * This implementation refers to the algorithm described in <a 
href="researcher.ibm.com/files/us-dpwoodru/tw11.pdf">
+ * <p>This implementation refers to the algorithm described in <a 
href="researcher.ibm.com/files/us-dpwoodru/tw11.pdf">
  * "Optimal Random Sampling from Distributed Streams Revisited"</a>.
  *
  * @param <T> The type of sample.
@@ -50,7 +51,7 @@ public class ReservoirSamplerWithReplacement<T> extends 
DistributedRandomSampler
        public ReservoirSamplerWithReplacement(int numSamples) {
                this(numSamples, new XORShiftRandom());
        }
-       
+
        /**
         * Create a sampler with fixed sample size and random number generator 
seed.
         *
@@ -60,7 +61,7 @@ public class ReservoirSamplerWithReplacement<T> extends 
DistributedRandomSampler
        public ReservoirSamplerWithReplacement(int numSamples, long seed) {
                this(numSamples, new XORShiftRandom(seed));
        }
-       
+
        /**
         * Create a sampler with fixed sample size and random number generator.
         *
@@ -76,7 +77,7 @@ public class ReservoirSamplerWithReplacement<T> extends 
DistributedRandomSampler
        @Override
        public Iterator<IntermediateSampleData<T>> 
sampleInPartition(Iterator<T> input) {
                if (numSamples == 0) {
-                       return EMPTY_INTERMEDIATE_ITERABLE;
+                       return emptyIntermediateIterable;
                }
 
                // This queue holds a fixed number of elements with the top K 
weight for current partition.

http://git-wip-us.apache.org/repos/asf/flink/blob/72678f14/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
index a38a921..212c339 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.sampling;
 
 import org.apache.flink.annotation.Internal;
@@ -35,14 +36,14 @@ import java.util.Random;
  * select top K elements as the output of each partitions. In the second 
phase, we select top K
  * elements from all the outputs of the first phase.
  *
- * This implementation refers to the algorithm described in <a 
href="researcher.ibm.com/files/us-dpwoodru/tw11.pdf">
+ * <p>This implementation refers to the algorithm described in <a 
href="researcher.ibm.com/files/us-dpwoodru/tw11.pdf">
  * "Optimal Random Sampling from Distributed Streams Revisited"</a>.
  *
  * @param <T> The type of the sampler.
  */
 @Internal
 public class ReservoirSamplerWithoutReplacement<T> extends 
DistributedRandomSampler<T> {
-       
+
        private final Random random;
 
        /**
@@ -56,7 +57,7 @@ public class ReservoirSamplerWithoutReplacement<T> extends 
DistributedRandomSamp
                Preconditions.checkArgument(numSamples >= 0, "numSamples should 
be non-negative.");
                this.random = random;
        }
-       
+
        /**
         * Create a new sampler with reservoir size and a default random number 
generator.
         *
@@ -65,7 +66,7 @@ public class ReservoirSamplerWithoutReplacement<T> extends 
DistributedRandomSamp
        public ReservoirSamplerWithoutReplacement(int numSamples) {
                this(numSamples, new XORShiftRandom());
        }
-       
+
        /**
         * Create a new sampler with reservoir size and the seed for random 
number generator.
         *
@@ -73,14 +74,14 @@ public class ReservoirSamplerWithoutReplacement<T> extends 
DistributedRandomSamp
         * @param seed       Random number generator seed.
         */
        public ReservoirSamplerWithoutReplacement(int numSamples, long seed) {
-               
+
                this(numSamples, new XORShiftRandom(seed));
        }
-       
+
        @Override
        public Iterator<IntermediateSampleData<T>> 
sampleInPartition(Iterator<T> input) {
                if (numSamples == 0) {
-                       return EMPTY_INTERMEDIATE_ITERABLE;
+                       return emptyIntermediateIterable;
                }
 
                // This queue holds fixed number elements with the top K weight 
for current partition.

http://git-wip-us.apache.org/repos/asf/flink/blob/72678f14/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java
index 228dd3a..db1d19f 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java
@@ -15,14 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.sampling;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest;
 import org.apache.flink.testutils.junit.RetryOnFailure;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.Preconditions;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -61,14 +63,12 @@ public class RandomSamplerTest {
 
        private static final List<Double> source = new 
ArrayList<Double>(SOURCE_SIZE);
 
-
        @Rule
        public final RetryRule retryRule = new RetryRule();
 
        @SuppressWarnings({"unchecked", "rawtypes"})
        private final List<Double>[] sourcePartitions = new 
List[DEFAULT_PARTITION_NUMBER];
 
-
        @BeforeClass
        public static void init() {
                // initiate source data set.
@@ -78,27 +78,27 @@ public class RandomSamplerTest {
        }
 
        private void initSourcePartition() {
-               for (int i = 0; i< DEFAULT_PARTITION_NUMBER; i++) {
-                       sourcePartitions[i] = new 
ArrayList<Double>((int)Math.ceil((double)SOURCE_SIZE / 
DEFAULT_PARTITION_NUMBER));
+               for (int i = 0; i < DEFAULT_PARTITION_NUMBER; i++) {
+                       sourcePartitions[i] = new ArrayList<Double>((int) 
Math.ceil((double) SOURCE_SIZE / DEFAULT_PARTITION_NUMBER));
                }
-               for (int i = 0; i< SOURCE_SIZE; i++) {
+               for (int i = 0; i < SOURCE_SIZE; i++) {
                        int index = i % DEFAULT_PARTITION_NUMBER;
-                       sourcePartitions[index].add((double)i);
+                       sourcePartitions[index].add((double) i);
                }
        }
-       
+
        @Test(expected = java.lang.IllegalArgumentException.class)
        public void testBernoulliSamplerWithUnexpectedFraction1() {
                verifySamplerFraction(-1, false);
        }
-       
+
        @Test(expected = java.lang.IllegalArgumentException.class)
        public void testBernoulliSamplerWithUnexpectedFraction2() {
                verifySamplerFraction(2, false);
        }
-       
+
        @Test
-       @RetryOnFailure(times=3)
+       @RetryOnFailure(times = 3)
        public void testBernoulliSamplerFraction() {
                verifySamplerFraction(0.01, false);
                verifySamplerFraction(0.05, false);
@@ -108,22 +108,22 @@ public class RandomSamplerTest {
                verifySamplerFraction(0.854, false);
                verifySamplerFraction(0.99, false);
        }
-       
+
        @Test
-       @RetryOnFailure(times=3)
+       @RetryOnFailure(times = 3)
        public void testBernoulliSamplerDuplicateElements() {
                verifyRandomSamplerDuplicateElements(new 
BernoulliSampler<Double>(0.01));
                verifyRandomSamplerDuplicateElements(new 
BernoulliSampler<Double>(0.1));
                verifyRandomSamplerDuplicateElements(new 
BernoulliSampler<Double>(0.5));
        }
-       
+
        @Test(expected = java.lang.IllegalArgumentException.class)
        public void testPoissonSamplerWithUnexpectedFraction1() {
                verifySamplerFraction(-1, true);
        }
-       
+
        @Test
-       @RetryOnFailure(times=3)
+       @RetryOnFailure(times = 3)
        public void testPoissonSamplerFraction() {
                verifySamplerFraction(0.01, true);
                verifySamplerFraction(0.05, true);
@@ -133,37 +133,37 @@ public class RandomSamplerTest {
                verifySamplerFraction(0.99, true);
                verifySamplerFraction(1.5, true);
        }
-       
+
        @Test(expected = java.lang.IllegalArgumentException.class)
        public void testReservoirSamplerUnexpectedSize1() {
                verifySamplerFixedSampleSize(-1, true);
        }
-       
+
        @Test(expected = java.lang.IllegalArgumentException.class)
        public void testReservoirSamplerUnexpectedSize2() {
                verifySamplerFixedSampleSize(-1, false);
        }
-       
+
        @Test
-       @RetryOnFailure(times=3)
+       @RetryOnFailure(times = 3)
        public void testBernoulliSamplerDistribution() {
                verifyBernoulliSampler(0.01d);
                verifyBernoulliSampler(0.05d);
                verifyBernoulliSampler(0.1d);
                verifyBernoulliSampler(0.5d);
        }
-       
+
        @Test
-       @RetryOnFailure(times=3)
+       @RetryOnFailure(times = 3)
        public void testPoissonSamplerDistribution() {
                verifyPoissonSampler(0.01d);
                verifyPoissonSampler(0.05d);
                verifyPoissonSampler(0.1d);
                verifyPoissonSampler(0.5d);
        }
-       
+
        @Test
-       @RetryOnFailure(times=3)
+       @RetryOnFailure(times = 3)
        public void testReservoirSamplerSampledSize() {
                verifySamplerFixedSampleSize(1, true);
                verifySamplerFixedSampleSize(10, true);
@@ -171,41 +171,41 @@ public class RandomSamplerTest {
                verifySamplerFixedSampleSize(1234, true);
                verifySamplerFixedSampleSize(9999, true);
                verifySamplerFixedSampleSize(20000, true);
-               
+
                verifySamplerFixedSampleSize(1, false);
                verifySamplerFixedSampleSize(10, false);
                verifySamplerFixedSampleSize(100, false);
                verifySamplerFixedSampleSize(1234, false);
                verifySamplerFixedSampleSize(9999, false);
        }
-       
+
        @Test
-       @RetryOnFailure(times=3)
+       @RetryOnFailure(times = 3)
        public void testReservoirSamplerSampledSize2() {
                RandomSampler<Double> sampler = new 
ReservoirSamplerWithoutReplacement<Double>(20000);
                Iterator<Double> sampled = sampler.sample(source.iterator());
                assertTrue("ReservoirSamplerWithoutReplacement sampled output 
size should not beyond the source size.", getSize(sampled) == SOURCE_SIZE);
        }
-       
+
        @Test
-       @RetryOnFailure(times=3)
+       @RetryOnFailure(times = 3)
        public void testReservoirSamplerDuplicateElements() {
                verifyRandomSamplerDuplicateElements(new 
ReservoirSamplerWithoutReplacement<Double>(100));
                verifyRandomSamplerDuplicateElements(new 
ReservoirSamplerWithoutReplacement<Double>(1000));
                verifyRandomSamplerDuplicateElements(new 
ReservoirSamplerWithoutReplacement<Double>(5000));
        }
-       
+
        @Test
-       @RetryOnFailure(times=3)
+       @RetryOnFailure(times = 3)
        public void testReservoirSamplerWithoutReplacement() {
                verifyReservoirSamplerWithoutReplacement(100, false);
                verifyReservoirSamplerWithoutReplacement(500, false);
                verifyReservoirSamplerWithoutReplacement(1000, false);
                verifyReservoirSamplerWithoutReplacement(5000, false);
        }
-       
+
        @Test
-       @RetryOnFailure(times=3)
+       @RetryOnFailure(times = 3)
        public void testReservoirSamplerWithReplacement() {
                verifyReservoirSamplerWithReplacement(100, false);
                verifyReservoirSamplerWithReplacement(500, false);
@@ -214,7 +214,7 @@ public class RandomSamplerTest {
        }
 
        @Test
-       @RetryOnFailure(times=3)
+       @RetryOnFailure(times = 3)
        public void testReservoirSamplerWithMultiSourcePartitions1() {
                initSourcePartition();
 
@@ -225,7 +225,7 @@ public class RandomSamplerTest {
        }
 
        @Test
-       @RetryOnFailure(times=3)
+       @RetryOnFailure(times = 3)
        public void testReservoirSamplerWithMultiSourcePartitions2() {
                initSourcePartition();
 
@@ -259,7 +259,7 @@ public class RandomSamplerTest {
                } else {
                        sampler = new BernoulliSampler<Double>(fraction);
                }
-               
+
                // take 20 times sample, and take the average result size for 
next step comparison.
                int totalSampledSize = 0;
                double sampleCount = 20;
@@ -283,7 +283,7 @@ public class RandomSamplerTest {
                Set<Double> set = Sets.newHashSet(list);
                assertTrue("There should not have duplicate element for sampler 
without replacement.", list.size() == set.size());
        }
-       
+
        private int getSize(Iterator<?> iterator) {
                int size = 0;
                while (iterator.hasNext()) {
@@ -292,25 +292,25 @@ public class RandomSamplerTest {
                }
                return size;
        }
-       
+
        private void verifyBernoulliSampler(double fraction) {
                BernoulliSampler<Double> sampler = new 
BernoulliSampler<Double>(fraction);
                verifyRandomSamplerWithFraction(fraction, sampler, true);
                verifyRandomSamplerWithFraction(fraction, sampler, false);
        }
-       
+
        private void verifyPoissonSampler(double fraction) {
                PoissonSampler<Double> sampler = new 
PoissonSampler<Double>(fraction);
                verifyRandomSamplerWithFraction(fraction, sampler, true);
                verifyRandomSamplerWithFraction(fraction, sampler, false);
        }
-       
+
        private void verifyReservoirSamplerWithReplacement(int numSamplers, 
boolean sampleOnPartitions) {
                ReservoirSamplerWithReplacement<Double> sampler = new 
ReservoirSamplerWithReplacement<Double>(numSamplers);
                verifyRandomSamplerWithSampleSize(numSamplers, sampler, true, 
sampleOnPartitions);
                verifyRandomSamplerWithSampleSize(numSamplers, sampler, false, 
sampleOnPartitions);
        }
-       
+
        private void verifyReservoirSamplerWithoutReplacement(int numSamplers, 
boolean sampleOnPartitions) {
                ReservoirSamplerWithoutReplacement<Double> sampler = new 
ReservoirSamplerWithoutReplacement<Double>(numSamplers);
                verifyRandomSamplerWithSampleSize(numSamplers, sampler, true, 
sampleOnPartitions);
@@ -330,7 +330,7 @@ public class RandomSamplerTest {
                } else {
                        baseSample = getWrongSampler(fraction);
                }
-               
+
                verifyKSTest(sampler, baseSample, withDefaultSampler);
        }
 
@@ -347,7 +347,7 @@ public class RandomSamplerTest {
                } else {
                        baseSample = getWrongSampler(sampleSize);
                }
-               
+
                verifyKSTest(sampler, baseSample, withDefaultSampler, 
sampleWithPartitions);
        }
 
@@ -365,13 +365,13 @@ public class RandomSamplerTest {
                        assertTrue(String.format("KS test result with p 
value(%f), d value(%f)", pValue, dValue), pValue > dValue);
                }
        }
-       
+
        private double[] getSampledOutput(RandomSampler<Double> sampler, 
boolean sampleOnPartitions) {
                Iterator<Double> sampled;
                if (sampleOnPartitions) {
-                       DistributedRandomSampler<Double> reservoirRandomSampler 
= (DistributedRandomSampler<Double>)sampler;
+                       DistributedRandomSampler<Double> reservoirRandomSampler 
= (DistributedRandomSampler<Double>) sampler;
                        List<IntermediateSampleData<Double>> intermediateResult 
= Lists.newLinkedList();
-                       for (int i = 0; i< DEFAULT_PARTITION_NUMBER; i++) {
+                       for (int i = 0; i < DEFAULT_PARTITION_NUMBER; i++) {
                                Iterator<IntermediateSampleData<Double>> 
partialIntermediateResult = 
reservoirRandomSampler.sampleInPartition(sourcePartitions[i].iterator());
                                while (partialIntermediateResult.hasNext()) {
                                        
intermediateResult.add(partialIntermediateResult.next());
@@ -408,10 +408,10 @@ public class RandomSamplerTest {
                for (int i = 0; i < size; i++) {
                        defaultSampler[i] = Math.round(step * i);
                }
-               
+
                return defaultSampler;
        }
-       
+
        private double[] getDefaultSampler(int fixSize) {
                Preconditions.checkArgument(fixSize > 0, "Sample fraction 
should be positive.");
                double step = SOURCE_SIZE / (double) fixSize;
@@ -419,10 +419,10 @@ public class RandomSamplerTest {
                for (int i = 0; i < fixSize; i++) {
                        defaultSampler[i] = Math.round(step * i);
                }
-               
+
                return defaultSampler;
        }
-       
+
        /*
         * Build a failed sample distribution which only contains elements in 
the first half of source data.
         */
@@ -434,10 +434,10 @@ public class RandomSamplerTest {
                for (int i = 0; i < size; i++) {
                        wrongSampler[i] = (double) i % halfSourceSize;
                }
-               
+
                return wrongSampler;
        }
-       
+
        /*
         * Build a failed sample distribution which only contains elements in 
the first half of source data.
         */
@@ -448,10 +448,10 @@ public class RandomSamplerTest {
                for (int i = 0; i < fixSize; i++) {
                        wrongSampler[i] = (double) i % halfSourceSize;
                }
-               
+
                return wrongSampler;
        }
-       
+
        /*
         * Calculate the D value of K-S test for p-value 0.001, m and n are the 
sample size
         */

Reply via email to