Migrate checkCombineFn in TestUtils to CombineFnTester This makes CombineFnTester significantly more discoverable, and usable without having dependencies on the test JAR.
Update existing tests. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51867adf Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51867adf Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51867adf Branch: refs/heads/master Commit: 51867adfea0d78a7dcb0fb09b5c257cf8e0e2ff4 Parents: 673937b Author: Thomas Groh <tg...@google.com> Authored: Thu Jul 20 13:36:04 2017 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Wed Jul 26 16:31:13 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/testing/CombineFnTester.java | 147 ++++++++++ .../java/org/apache/beam/sdk/TestUtils.java | 88 ------ .../beam/sdk/testing/CombineFnTesterTest.java | 276 +++++++++++++++++++ .../transforms/ApproximateQuantilesTest.java | 24 +- .../apache/beam/sdk/transforms/CombineTest.java | 12 +- .../org/apache/beam/sdk/transforms/MaxTest.java | 8 +- .../apache/beam/sdk/transforms/MeanTest.java | 4 +- .../org/apache/beam/sdk/transforms/MinTest.java | 8 +- .../org/apache/beam/sdk/transforms/SumTest.java | 8 +- 9 files changed, 455 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java new file mode 100644 index 0000000..efd2af3 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.hamcrest.Matcher; + +/** + * Utilities for testing {@link CombineFn CombineFns}. Ensures that the {@link CombineFn} gives + * correct results across various permutations and shardings of the input. + */ +public class CombineFnTester { + /** + * Tests that the the {@link CombineFn}, when applied to the provided input, produces the provided + * output. Tests a variety of permutations of the input. + */ + public static <InputT, AccumT, OutputT> void testCombineFn( + CombineFn<InputT, AccumT, OutputT> fn, List<InputT> input, final OutputT expected) { + testCombineFn(fn, input, is(expected)); + Collections.shuffle(input); + testCombineFn(fn, input, is(expected)); + } + + public static <InputT, AccumT, OutputT> void testCombineFn( + CombineFn<InputT, AccumT, OutputT> fn, List<InputT> input, Matcher<? super OutputT> matcher) { + int size = input.size(); + checkCombineFnShardsMultipleOrders(fn, Collections.singletonList(input), matcher); + checkCombineFnShardsMultipleOrders(fn, shardEvenly(input, 2), matcher); + if (size > 4) { + checkCombineFnShardsMultipleOrders(fn, shardEvenly(input, size / 2), matcher); + checkCombineFnShardsMultipleOrders( + fn, shardEvenly(input, (int) (size / Math.sqrt(size))), matcher); + } + checkCombineFnShardsMultipleOrders(fn, shardExponentially(input, 1.4), matcher); + checkCombineFnShardsMultipleOrders(fn, shardExponentially(input, 2), matcher); + checkCombineFnShardsMultipleOrders(fn, shardExponentially(input, Math.E), matcher); + } + + private static <InputT, AccumT, OutputT> void checkCombineFnShardsMultipleOrders( + CombineFn<InputT, AccumT, OutputT> fn, + List<? extends Iterable<InputT>> shards, + Matcher<? super OutputT> matcher) { + checkCombineFnShardsSingleMerge(fn, shards, matcher); + checkCombineFnShardsWithEmptyAccumulators(fn, shards, matcher); + checkCombineFnShardsIncrementalMerging(fn, shards, matcher); + Collections.shuffle(shards); + checkCombineFnShardsSingleMerge(fn, shards, matcher); + checkCombineFnShardsWithEmptyAccumulators(fn, shards, matcher); + checkCombineFnShardsIncrementalMerging(fn, shards, matcher); + } + + private static <InputT, AccumT, OutputT> void checkCombineFnShardsSingleMerge( + CombineFn<InputT, AccumT, OutputT> fn, + Iterable<? extends Iterable<InputT>> shards, + Matcher<? super OutputT> matcher) { + List<AccumT> accumulators = combineInputs(fn, shards); + AccumT merged = fn.mergeAccumulators(accumulators); + assertThat(fn.extractOutput(merged), matcher); + } + + private static <InputT, AccumT, OutputT> void checkCombineFnShardsWithEmptyAccumulators( + CombineFn<InputT, AccumT, OutputT> fn, + Iterable<? extends Iterable<InputT>> shards, + Matcher<? super OutputT> matcher) { + List<AccumT> accumulators = combineInputs(fn, shards); + accumulators.add(0, fn.createAccumulator()); + accumulators.add(fn.createAccumulator()); + AccumT merged = fn.mergeAccumulators(accumulators); + assertThat(fn.extractOutput(merged), matcher); + } + + private static <InputT, AccumT, OutputT> void checkCombineFnShardsIncrementalMerging( + CombineFn<InputT, AccumT, OutputT> fn, + List<? extends Iterable<InputT>> shards, + Matcher<? super OutputT> matcher) { + AccumT accumulator = null; + for (AccumT inputAccum : combineInputs(fn, shards)) { + if (accumulator == null) { + accumulator = inputAccum; + } else { + accumulator = fn.mergeAccumulators(Arrays.asList(accumulator, inputAccum)); + } + } + assertThat(fn.extractOutput(accumulator), matcher); + } + + private static <InputT, AccumT, OutputT> List<AccumT> combineInputs( + CombineFn<InputT, AccumT, OutputT> fn, Iterable<? extends Iterable<InputT>> shards) { + List<AccumT> accumulators = new ArrayList<>(); + int maybeCompact = 0; + for (Iterable<InputT> shard : shards) { + AccumT accumulator = fn.createAccumulator(); + for (InputT elem : shard) { + accumulator = fn.addInput(accumulator, elem); + } + if (maybeCompact++ % 2 == 0) { + accumulator = fn.compact(accumulator); + } + accumulators.add(accumulator); + } + return accumulators; + } + + private static <T> List<List<T>> shardEvenly(List<T> input, int numShards) { + List<List<T>> shards = new ArrayList<>(numShards); + for (int i = 0; i < numShards; i++) { + shards.add(input.subList(i * input.size() / numShards, + (i + 1) * input.size() / numShards)); + } + return shards; + } + + private static <T> List<List<T>> shardExponentially( + List<T> input, double base) { + assert base > 1.0; + List<List<T>> shards = new ArrayList<>(); + int end = input.size(); + while (end > 0) { + int start = (int) (end / base); + shards.add(input.subList(start, end)); + end = start; + } + return shards; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/test/java/org/apache/beam/sdk/TestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/TestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/TestUtils.java index 1224f10..5ccc1ac 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/TestUtils.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/TestUtils.java @@ -17,15 +17,9 @@ */ package org.apache.beam.sdk; -import static org.junit.Assert.assertThat; - -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; -import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.values.KV; -import org.hamcrest.CoreMatchers; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; @@ -127,86 +121,4 @@ public class TestUtils { .appendText(")"); } } - - //////////////////////////////////////////////////////////////////////////// - // Utilities for testing CombineFns, ensuring they give correct results - // across various permutations and shardings of the input. - - public static <InputT, AccumT, OutputT> void checkCombineFn( - CombineFn<InputT, AccumT, OutputT> fn, List<InputT> input, final OutputT expected) { - checkCombineFn(fn, input, CoreMatchers.is(expected)); - } - - public static <InputT, AccumT, OutputT> void checkCombineFn( - CombineFn<InputT, AccumT, OutputT> fn, List<InputT> input, Matcher<? super OutputT> matcher) { - checkCombineFnInternal(fn, input, matcher); - Collections.shuffle(input); - checkCombineFnInternal(fn, input, matcher); - } - - private static <InputT, AccumT, OutputT> void checkCombineFnInternal( - CombineFn<InputT, AccumT, OutputT> fn, List<InputT> input, Matcher<? super OutputT> matcher) { - int size = input.size(); - checkCombineFnShards(fn, Collections.singletonList(input), matcher); - checkCombineFnShards(fn, shardEvenly(input, 2), matcher); - if (size > 4) { - checkCombineFnShards(fn, shardEvenly(input, size / 2), matcher); - checkCombineFnShards( - fn, shardEvenly(input, (int) (size / Math.sqrt(size))), matcher); - } - checkCombineFnShards(fn, shardExponentially(input, 1.4), matcher); - checkCombineFnShards(fn, shardExponentially(input, 2), matcher); - checkCombineFnShards(fn, shardExponentially(input, Math.E), matcher); - } - - public static <InputT, AccumT, OutputT> void checkCombineFnShards( - CombineFn<InputT, AccumT, OutputT> fn, - List<? extends Iterable<InputT>> shards, - Matcher<? super OutputT> matcher) { - checkCombineFnShardsInternal(fn, shards, matcher); - Collections.shuffle(shards); - checkCombineFnShardsInternal(fn, shards, matcher); - } - - private static <InputT, AccumT, OutputT> void checkCombineFnShardsInternal( - CombineFn<InputT, AccumT, OutputT> fn, - Iterable<? extends Iterable<InputT>> shards, - Matcher<? super OutputT> matcher) { - List<AccumT> accumulators = new ArrayList<>(); - int maybeCompact = 0; - for (Iterable<InputT> shard : shards) { - AccumT accumulator = fn.createAccumulator(); - for (InputT elem : shard) { - accumulator = fn.addInput(accumulator, elem); - } - if (maybeCompact++ % 2 == 0) { - accumulator = fn.compact(accumulator); - } - accumulators.add(accumulator); - } - AccumT merged = fn.mergeAccumulators(accumulators); - assertThat(fn.extractOutput(merged), matcher); - } - - private static <T> List<List<T>> shardEvenly(List<T> input, int numShards) { - List<List<T>> shards = new ArrayList<>(numShards); - for (int i = 0; i < numShards; i++) { - shards.add(input.subList(i * input.size() / numShards, - (i + 1) * input.size() / numShards)); - } - return shards; - } - - private static <T> List<List<T>> shardExponentially( - List<T> input, double base) { - assert base > 1.0; - List<List<T>> shards = new ArrayList<>(); - int end = input.size(); - while (end > 0) { - int start = (int) (end / base); - shards.add(input.subList(start, end)); - end = start; - } - return shards; - } } http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CombineFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CombineFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CombineFnTesterTest.java new file mode 100644 index 0000000..15198b2 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CombineFnTesterTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.testing; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import com.google.common.collect.Iterables; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Sum; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link CombineFnTester}. + */ +@RunWith(JUnit4.class) +public class CombineFnTesterTest { + @Test + public void checksMergeWithEmptyAccumulators() { + final AtomicBoolean sawEmpty = new AtomicBoolean(false); + CombineFn<Integer, Integer, Integer> combineFn = + new CombineFn<Integer, Integer, Integer>() { + @Override + public Integer createAccumulator() { + return 0; + } + + @Override + public Integer addInput(Integer accumulator, Integer input) { + return accumulator + input; + } + + @Override + public Integer mergeAccumulators(Iterable<Integer> accumulators) { + int result = 0; + for (int accum : accumulators) { + if (accum == 0) { + sawEmpty.set(true); + } + result += accum; + } + return result; + } + + @Override + public Integer extractOutput(Integer accumulator) { + return accumulator; + } + }; + + CombineFnTester.testCombineFn(combineFn, Arrays.asList(1, 2, 3, 4, 5), 15); + assertThat(sawEmpty.get(), is(true)); + } + + @Test + public void checksWithSingleShard() { + final AtomicBoolean sawSingleShard = new AtomicBoolean(); + CombineFn<Integer, Integer, Integer> combineFn = + new CombineFn<Integer, Integer, Integer>() { + int accumCount = 0; + + @Override + public Integer createAccumulator() { + accumCount++; + return 0; + } + + @Override + public Integer addInput(Integer accumulator, Integer input) { + return accumulator + input; + } + + @Override + public Integer mergeAccumulators(Iterable<Integer> accumulators) { + int result = 0; + for (int accum : accumulators) { + result += accum; + } + return result; + } + + @Override + public Integer extractOutput(Integer accumulator) { + if (accumCount == 1) { + sawSingleShard.set(true); + } + accumCount = 0; + return accumulator; + } + }; + + CombineFnTester.testCombineFn(combineFn, Arrays.asList(1, 2, 3, 4, 5), 15); + assertThat(sawSingleShard.get(), is(true)); + } + + @Test + public void checksWithShards() { + final AtomicBoolean sawManyShards = new AtomicBoolean(); + CombineFn<Integer, Integer, Integer> combineFn = + new CombineFn<Integer, Integer, Integer>() { + + @Override + public Integer createAccumulator() { + return 0; + } + + @Override + public Integer addInput(Integer accumulator, Integer input) { + return accumulator + input; + } + + @Override + public Integer mergeAccumulators(Iterable<Integer> accumulators) { + if (Iterables.size(accumulators) > 2) { + sawManyShards.set(true); + } + int result = 0; + for (int accum : accumulators) { + result += accum; + } + return result; + } + + @Override + public Integer extractOutput(Integer accumulator) { + return accumulator; + } + }; + + CombineFnTester.testCombineFn( + combineFn, Arrays.asList(1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3), 30); + assertThat(sawManyShards.get(), is(true)); + } + + @Test + public void checksWithMultipleMerges() { + final AtomicBoolean sawMultipleMerges = new AtomicBoolean(); + CombineFn<Integer, Integer, Integer> combineFn = + new CombineFn<Integer, Integer, Integer>() { + int mergeCalls = 0; + + @Override + public Integer createAccumulator() { + return 0; + } + + @Override + public Integer addInput(Integer accumulator, Integer input) { + return accumulator + input; + } + + @Override + public Integer mergeAccumulators(Iterable<Integer> accumulators) { + mergeCalls++; + int result = 0; + for (int accum : accumulators) { + result += accum; + } + return result; + } + + @Override + public Integer extractOutput(Integer accumulator) { + if (mergeCalls > 1) { + sawMultipleMerges.set(true); + } + mergeCalls = 0; + return accumulator; + } + }; + + CombineFnTester.testCombineFn(combineFn, Arrays.asList(1, 1, 2, 2, 3, 3, 4, 4, 5, 5), 30); + assertThat(sawMultipleMerges.get(), is(true)); + } + + @Test + public void checksAlternateOrder() { + final AtomicBoolean sawOutOfOrder = new AtomicBoolean(); + CombineFn<Integer, List<Integer>, Integer> combineFn = + new CombineFn<Integer, List<Integer>, Integer>() { + @Override + public List<Integer> createAccumulator() { + return new ArrayList<>(); + } + + @Override + public List<Integer> addInput(List<Integer> accumulator, Integer input) { + // If the input is being added to an empty accumulator, it's not known to be + // out of order, and it cannot be compared to the previous element. If the elements + // are out of order (relative to the input) a greater element will be added before + // a smaller one. + if (!accumulator.isEmpty() && accumulator.get(accumulator.size() - 1) > input) { + sawOutOfOrder.set(true); + } + accumulator.add(input); + return accumulator; + } + + @Override + public List<Integer> mergeAccumulators(Iterable<List<Integer>> accumulators) { + List<Integer> result = new ArrayList<>(); + for (List<Integer> accum : accumulators) { + result.addAll(accum); + } + return result; + } + + @Override + public Integer extractOutput(List<Integer> accumulator) { + int value = 0; + for (int i : accumulator) { + value += i; + } + return value; + } + }; + + CombineFnTester.testCombineFn( + combineFn, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14), 105); + assertThat(sawOutOfOrder.get(), is(true)); + } + + @Test + public void usesMatcher() { + final AtomicBoolean matcherUsed = new AtomicBoolean(); + Matcher<Integer> matcher = + new TypeSafeMatcher<Integer>() { + @Override + public void describeTo(Description description) {} + + @Override + protected boolean matchesSafely(Integer item) { + matcherUsed.set(true); + return item == 30; + } + }; + CombineFnTester.testCombineFn( + Sum.ofIntegers(), Arrays.asList(1, 1, 2, 2, 3, 3, 4, 4, 5, 5), matcher); + assertThat(matcherUsed.get(), is(true)); + try { + CombineFnTester.testCombineFn( + Sum.ofIntegers(), Arrays.asList(1, 2, 3, 4, 5), Matchers.not(Matchers.equalTo(15))); + } catch (AssertionError ignored) { + // Success! Return to avoid the call to fail(); + return; + } + fail("The matcher should have failed, throwing an error"); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java index 9e0b3cc..e180833 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.transforms; -import static org.apache.beam.sdk.TestUtils.checkCombineFn; +import static org.apache.beam.sdk.testing.CombineFnTester.testCombineFn; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; @@ -129,7 +129,7 @@ public class ApproximateQuantilesTest { @Test public void testSingleton() { - checkCombineFn( + testCombineFn( ApproximateQuantilesCombineFn.<Integer>create(5), Arrays.asList(389), Arrays.asList(389, 389, 389, 389, 389)); @@ -137,7 +137,7 @@ public class ApproximateQuantilesTest { @Test public void testSimpleQuantiles() { - checkCombineFn( + testCombineFn( ApproximateQuantilesCombineFn.<Integer>create(5), intRange(101), Arrays.asList(0, 25, 50, 75, 100)); @@ -145,7 +145,7 @@ public class ApproximateQuantilesTest { @Test public void testUnevenQuantiles() { - checkCombineFn( + testCombineFn( ApproximateQuantilesCombineFn.<Integer>create(37), intRange(5000), quantileMatcher(5000, 37, 20 /* tolerance */)); @@ -153,7 +153,7 @@ public class ApproximateQuantilesTest { @Test public void testLargerQuantiles() { - checkCombineFn( + testCombineFn( ApproximateQuantilesCombineFn.<Integer>create(50), intRange(10001), quantileMatcher(10001, 50, 20 /* tolerance */)); @@ -161,7 +161,7 @@ public class ApproximateQuantilesTest { @Test public void testTightEpsilon() { - checkCombineFn( + testCombineFn( ApproximateQuantilesCombineFn.<Integer>create(10).withEpsilon(0.01), intRange(10001), quantileMatcher(10001, 10, 5 /* tolerance */)); @@ -174,7 +174,7 @@ public class ApproximateQuantilesTest { for (int i = 0; i < 10; i++) { all.addAll(intRange(size)); } - checkCombineFn( + testCombineFn( ApproximateQuantilesCombineFn.<Integer>create(5), all, Arrays.asList(0, 25, 50, 75, 100)); @@ -190,7 +190,7 @@ public class ApproximateQuantilesTest { for (int i = 300; i < 1000; i++) { all.add(3); } - checkCombineFn( + testCombineFn( ApproximateQuantilesCombineFn.<Integer>create(5), all, Arrays.asList(1, 2, 3, 3, 3)); @@ -202,7 +202,7 @@ public class ApproximateQuantilesTest { for (int i = 1; i < 1000; i++) { all.add((int) Math.log(i)); } - checkCombineFn( + testCombineFn( ApproximateQuantilesCombineFn.<Integer>create(5), all, Arrays.asList(0, 5, 6, 6, 6)); @@ -214,7 +214,7 @@ public class ApproximateQuantilesTest { for (int i = 1; i < 1000; i++) { all.add(1000 / i); } - checkCombineFn( + testCombineFn( ApproximateQuantilesCombineFn.<Integer>create(5), all, Arrays.asList(1, 1, 2, 4, 1000)); @@ -224,11 +224,11 @@ public class ApproximateQuantilesTest { public void testAlternateComparator() { List<String> inputs = Arrays.asList( "aa", "aaa", "aaaa", "b", "ccccc", "dddd", "zz"); - checkCombineFn( + testCombineFn( ApproximateQuantilesCombineFn.<String>create(3), inputs, Arrays.asList("aa", "b", "zz")); - checkCombineFn( + testCombineFn( ApproximateQuantilesCombineFn.create(3, new OrderByLength()), inputs, Arrays.asList("b", "aaa", "ccccc")); http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index b24d82d..52fedc6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.transforms; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import static org.apache.beam.sdk.TestUtils.checkCombineFn; +import static org.apache.beam.sdk.testing.CombineFnTester.testCombineFn; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor; @@ -695,11 +695,11 @@ public class CombineTest implements Serializable { @Test public void testBinaryCombineFnWithNulls() { - checkCombineFn(new NullCombiner(), Arrays.asList(3, 3, 5), 45); - checkCombineFn(new NullCombiner(), Arrays.asList(null, 3, 5), 30); - checkCombineFn(new NullCombiner(), Arrays.asList(3, 3, null), 18); - checkCombineFn(new NullCombiner(), Arrays.asList(null, 3, null), 12); - checkCombineFn(new NullCombiner(), Arrays.<Integer>asList(null, null, null), 8); + testCombineFn(new NullCombiner(), Arrays.asList(3, 3, 5), 45); + testCombineFn(new NullCombiner(), Arrays.asList(null, 3, 5), 30); + testCombineFn(new NullCombiner(), Arrays.asList(3, 3, null), 18); + testCombineFn(new NullCombiner(), Arrays.asList(null, 3, null), 12); + testCombineFn(new NullCombiner(), Arrays.<Integer>asList(null, null, null), 8); } private static final class TestProdInt extends Combine.BinaryCombineIntegerFn { http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java index 52043e1..a298a5e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.transforms; -import static org.apache.beam.sdk.TestUtils.checkCombineFn; +import static org.apache.beam.sdk.testing.CombineFnTester.testCombineFn; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; @@ -45,7 +45,7 @@ public class MaxTest { @Test public void testMaxIntegerFn() { - checkCombineFn( + testCombineFn( Max.ofIntegers(), Lists.newArrayList(1, 2, 3, 4), 4); @@ -53,7 +53,7 @@ public class MaxTest { @Test public void testMaxLongFn() { - checkCombineFn( + testCombineFn( Max.ofLongs(), Lists.newArrayList(1L, 2L, 3L, 4L), 4L); @@ -61,7 +61,7 @@ public class MaxTest { @Test public void testMaxDoubleFn() { - checkCombineFn( + testCombineFn( Max.ofDoubles(), Lists.newArrayList(1.0, 2.0, 3.0, 4.0), 4.0); http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java index 79ebc25..e138135 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.transforms; -import static org.apache.beam.sdk.TestUtils.checkCombineFn; +import static org.apache.beam.sdk.testing.CombineFnTester.testCombineFn; import static org.junit.Assert.assertEquals; import com.google.common.collect.Lists; @@ -64,7 +64,7 @@ public class MeanTest { @Test public void testMeanFn() throws Exception { - checkCombineFn( + testCombineFn( Mean.<Integer>of(), Lists.newArrayList(1, 2, 3, 4), 2.5); http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java index 1ece09b..a515b63 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.transforms; -import static org.apache.beam.sdk.TestUtils.checkCombineFn; +import static org.apache.beam.sdk.testing.CombineFnTester.testCombineFn; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; @@ -45,7 +45,7 @@ public class MinTest { } @Test public void testMinIntegerFn() { - checkCombineFn( + testCombineFn( Min.ofIntegers(), Lists.newArrayList(1, 2, 3, 4), 1); @@ -53,7 +53,7 @@ public class MinTest { @Test public void testMinLongFn() { - checkCombineFn( + testCombineFn( Min.ofLongs(), Lists.newArrayList(1L, 2L, 3L, 4L), 1L); @@ -61,7 +61,7 @@ public class MinTest { @Test public void testMinDoubleFn() { - checkCombineFn( + testCombineFn( Min.ofDoubles(), Lists.newArrayList(1.0, 2.0, 3.0, 4.0), 1.0); http://git-wip-us.apache.org/repos/asf/beam/blob/51867adf/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java index 9d2c6f6..e5bf904 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.transforms; -import static org.apache.beam.sdk.TestUtils.checkCombineFn; +import static org.apache.beam.sdk.testing.CombineFnTester.testCombineFn; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -51,7 +51,7 @@ public class SumTest { @Test public void testSumIntegerFn() { - checkCombineFn( + testCombineFn( Sum.ofIntegers(), Lists.newArrayList(1, 2, 3, 4), 10); @@ -59,7 +59,7 @@ public class SumTest { @Test public void testSumLongFn() { - checkCombineFn( + testCombineFn( Sum.ofLongs(), Lists.newArrayList(1L, 2L, 3L, 4L), 10L); @@ -67,7 +67,7 @@ public class SumTest { @Test public void testSumDoubleFn() { - checkCombineFn( + testCombineFn( Sum.ofDoubles(), Lists.newArrayList(1.0, 2.0, 3.0, 4.0), 10.0);