Repository: beam Updated Branches: refs/heads/master c12d6ba80 -> 1fb430442
A few cleanups in CombineTest Better error messages and IntelliJ warning cleanups. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cf654a0b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cf654a0b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cf654a0b Branch: refs/heads/master Commit: cf654a0bcd876310311f48deb64cd49d7df2893c Parents: c12d6ba Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Jun 16 13:07:48 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Mon Jun 19 10:23:20 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/transforms/CombineTest.java | 125 ++++++++----------- 1 file changed, 53 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/cf654a0b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index c4ba62d..e2469ab 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.transforms; -import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.sdk.TestUtils.checkCombineFn; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; @@ -45,7 +45,6 @@ import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.DoubleCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -85,7 +84,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mock; /** * Tests for Combine transforms. @@ -97,8 +95,6 @@ public class CombineTest implements Serializable { static final List<KV<String, Integer>> EMPTY_TABLE = Collections.emptyList(); - @Mock private DoFn<?, ?>.ProcessContext processContext; - @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @@ -142,12 +138,12 @@ public class CombineTest implements Serializable { PCollection<KV<String, String>> combinePerKey = perKeyInput.apply( Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView)) - .withSideInputs(Arrays.asList(globallySumView))); + .withSideInputs(globallySumView)); PCollection<String> combineGlobally = globallyInput .apply(Combine.globally(new TestCombineFnWithContext(globallySumView)) .withoutDefaults() - .withSideInputs(Arrays.asList(globallySumView))); + .withSideInputs(globallySumView)); PAssert.that(sum).containsInAnyOrder(globalSum); PAssert.that(combinePerKey).containsInAnyOrder(perKeyCombines); @@ -280,11 +276,9 @@ public class CombineTest implements Serializable { .apply(Combine.<String, Integer, String>perKey(new TestCombineFn())); PAssert.that(sum).containsInAnyOrder(2, 5, 13); - PAssert.that(sumPerKey).containsInAnyOrder( - KV.of("a", "11"), - KV.of("a", "4"), - KV.of("b", "1"), - KV.of("b", "13")); + PAssert.that(sumPerKey) + .containsInAnyOrder( + Arrays.asList(KV.of("a", "11"), KV.of("a", "4"), KV.of("b", "1"), KV.of("b", "13"))); pipeline.run(); } @@ -313,19 +307,18 @@ public class CombineTest implements Serializable { PCollection<KV<String, String>> combinePerKeyWithContext = perKeyInput.apply( Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView)) - .withSideInputs(Arrays.asList(globallySumView))); + .withSideInputs(globallySumView)); PCollection<String> combineGloballyWithContext = globallyInput .apply(Combine.globally(new TestCombineFnWithContext(globallySumView)) .withoutDefaults() - .withSideInputs(Arrays.asList(globallySumView))); + .withSideInputs(globallySumView)); PAssert.that(sum).containsInAnyOrder(2, 5, 13); - PAssert.that(combinePerKeyWithContext).containsInAnyOrder( - KV.of("a", "2:11"), - KV.of("a", "5:4"), - KV.of("b", "5:1"), - KV.of("b", "13:13")); + PAssert.that(combinePerKeyWithContext) + .containsInAnyOrder( + Arrays.asList( + KV.of("a", "2:11"), KV.of("a", "5:4"), KV.of("b", "5:1"), KV.of("b", "13:13"))); PAssert.that(combineGloballyWithContext).containsInAnyOrder("2:11", "5:14", "13:13"); pipeline.run(); } @@ -355,23 +348,25 @@ public class CombineTest implements Serializable { PCollection<KV<String, String>> combinePerKeyWithContext = perKeyInput.apply( Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView)) - .withSideInputs(Arrays.asList(globallySumView))); + .withSideInputs(globallySumView)); PCollection<String> combineGloballyWithContext = globallyInput .apply(Combine.globally(new TestCombineFnWithContext(globallySumView)) .withoutDefaults() - .withSideInputs(Arrays.asList(globallySumView))); + .withSideInputs(globallySumView)); PAssert.that(sum).containsInAnyOrder(1, 2, 1, 4, 5, 14, 13); - PAssert.that(combinePerKeyWithContext).containsInAnyOrder( - KV.of("a", "1:1"), - KV.of("a", "2:11"), - KV.of("a", "1:1"), - KV.of("a", "4:4"), - KV.of("a", "5:4"), - KV.of("b", "5:1"), - KV.of("b", "14:113"), - KV.of("b", "13:13")); + PAssert.that(combinePerKeyWithContext) + .containsInAnyOrder( + Arrays.asList( + KV.of("a", "1:1"), + KV.of("a", "2:11"), + KV.of("a", "1:1"), + KV.of("a", "4:4"), + KV.of("a", "5:4"), + KV.of("b", "5:1"), + KV.of("b", "14:113"), + KV.of("b", "13:13"))); PAssert.that(combineGloballyWithContext).containsInAnyOrder( "1:1", "2:11", "1:1", "4:4", "5:14", "14:113", "13:13"); pipeline.run(); @@ -433,10 +428,8 @@ public class CombineTest implements Serializable { .apply(Combine.<String, Integer, String>perKey(new TestCombineFn())); PAssert.that(sum).containsInAnyOrder(7, 13); - PAssert.that(sumPerKey).containsInAnyOrder( - KV.of("a", "114"), - KV.of("b", "1"), - KV.of("b", "13")); + PAssert.that(sumPerKey) + .containsInAnyOrder(Arrays.asList(KV.of("a", "114"), KV.of("b", "1"), KV.of("b", "13"))); pipeline.run(); } @@ -471,7 +464,7 @@ public class CombineTest implements Serializable { .apply( Combine.<String, Integer, String>perKey( new TestCombineFnWithContext(globallyFixedWindowsView)) - .withSideInputs(Arrays.asList(globallyFixedWindowsView))); + .withSideInputs(globallyFixedWindowsView)); PCollection<String> sessionsCombineGlobally = globallyInput @@ -481,13 +474,12 @@ public class CombineTest implements Serializable { .apply( Combine.globally(new TestCombineFnWithContext(globallyFixedWindowsView)) .withoutDefaults() - .withSideInputs(Arrays.asList(globallyFixedWindowsView))); + .withSideInputs(globallyFixedWindowsView)); PAssert.that(fixedWindowsSum).containsInAnyOrder(2, 4, 1, 13); - PAssert.that(sessionsCombinePerKey).containsInAnyOrder( - KV.of("a", "1:114"), - KV.of("b", "1:1"), - KV.of("b", "0:13")); + PAssert.that(sessionsCombinePerKey) + .containsInAnyOrder( + Arrays.asList(KV.of("a", "1:114"), KV.of("b", "1:1"), KV.of("b", "0:13"))); PAssert.that(sessionsCombineGlobally).containsInAnyOrder("1:1114", "0:13"); pipeline.run(); } @@ -716,7 +708,7 @@ public class CombineTest implements Serializable { pipeline .apply( "CreateMainInput", - Create.<Void>timestamped(nonEmptyElement, emptyElement).withCoder(VoidCoder.of())) + Create.timestamped(nonEmptyElement, emptyElement).withCoder(VoidCoder.of())) .apply("WindowMainInput", Window.<Void>into(windowFn)) .apply( "OutputSideInput", @@ -941,15 +933,13 @@ public class CombineTest implements Serializable { */ private class CountSumCoder extends AtomicCoder<CountSum> { @Override - public void encode(CountSum value, OutputStream outStream) - throws CoderException, IOException { + public void encode(CountSum value, OutputStream outStream) throws IOException { LONG_CODER.encode(value.count, outStream); DOUBLE_CODER.encode(value.sum, outStream); } @Override - public CountSum decode(InputStream inStream) - throws CoderException, IOException { + public CountSum decode(InputStream inStream) throws IOException { long count = LONG_CODER.decode(inStream); double sum = DOUBLE_CODER.decode(inStream); return new CountSum(count, sum); @@ -992,28 +982,15 @@ public class CombineTest implements Serializable { public static Coder<Accumulator> getCoder() { return new AtomicCoder<Accumulator>() { @Override - public void encode(Accumulator accumulator, OutputStream outStream) - throws CoderException, IOException { - encode(accumulator, outStream, Coder.Context.NESTED); + public void encode(Accumulator accumulator, OutputStream outStream) throws IOException { + StringUtf8Coder.of().encode(accumulator.seed, outStream); + StringUtf8Coder.of().encode(accumulator.value, outStream); } @Override - public void encode(Accumulator accumulator, OutputStream outStream, Coder.Context context) - throws CoderException, IOException { - StringUtf8Coder.of().encode(accumulator.seed, outStream, context.nested()); - StringUtf8Coder.of().encode(accumulator.value, outStream, context); - } - - @Override - public Accumulator decode(InputStream inStream) throws CoderException, IOException { - return decode(inStream, Coder.Context.NESTED); - } - - @Override - public Accumulator decode(InputStream inStream, Coder.Context context) - throws CoderException, IOException { - String seed = StringUtf8Coder.of().decode(inStream, context.nested()); - String value = StringUtf8Coder.of().decode(inStream, context); + public Accumulator decode(InputStream inStream) throws IOException { + String seed = StringUtf8Coder.of().decode(inStream); + String value = StringUtf8Coder.of().decode(inStream); return new Accumulator(seed, value); } }; @@ -1042,18 +1019,22 @@ public class CombineTest implements Serializable { @Override public Accumulator mergeAccumulators(Iterable<Accumulator> accumulators) { - String seed = null; - String all = ""; + Accumulator seedAccumulator = null; + StringBuilder all = new StringBuilder(); for (Accumulator accumulator : accumulators) { - if (seed == null) { - seed = accumulator.seed; + if (seedAccumulator == null) { + seedAccumulator = accumulator; } else { - checkArgument(seed.equals(accumulator.seed), "Different seed values in accumulator"); + assertEquals( + String.format( + "Different seed values in accumulator: %s vs. %s", seedAccumulator, accumulator), + seedAccumulator.seed, + accumulator.seed); } - all += accumulator.value; + all.append(accumulator.value); accumulator.value = "cleared in mergeAccumulators"; } - return new Accumulator(seed, all); + return new Accumulator(checkNotNull(seedAccumulator).seed, all.toString()); } @Override @@ -1161,7 +1142,7 @@ public class CombineTest implements Serializable { @Override public void mergeAccumulator(Counter accumulator) { checkState(outputs == 0); - checkArgument(accumulator.outputs == 0); + assertEquals(0, accumulator.outputs); merges += accumulator.merges + 1; inputs += accumulator.inputs;