Repository: beam Updated Branches: refs/heads/master 90ba7536f -> f62586a08
Reject stateful ParDo if coder not KvCoder with deterministic key coder Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/175ff2fe Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/175ff2fe Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/175ff2fe Branch: refs/heads/master Commit: 175ff2fe873cacb11c0bb47c9812e1cd336ada5f Parents: 90ba753 Author: Kenneth Knowles <k...@google.com> Authored: Wed Jul 5 17:24:25 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Jul 7 11:52:52 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/transforms/ParDo.java | 27 +++++ .../apache/beam/sdk/transforms/ParDoTest.java | 102 +++++++++++++++++++ 2 files changed, 129 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/175ff2fe/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index db1f791..0d03835 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.transforms.DoFn.WindowedContext; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -455,6 +456,27 @@ public class ParDo { } } + private static void validateStateApplicableForInput( + DoFn<?, ?> fn, + PCollection<?> input) { + Coder<?> inputCoder = input.getCoder(); + checkArgument( + inputCoder instanceof KvCoder, + "%s requires its input to use %s in order to use state and timers.", + ParDo.class.getSimpleName(), + KvCoder.class.getSimpleName()); + + KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) inputCoder; + try { + kvCoder.getKeyCoder().verifyDeterministic(); + } catch (Coder.NonDeterministicException exc) { + throw new IllegalArgumentException( + String.format( + "%s requires a deterministic key coder in order to use state and timers", + ParDo.class.getSimpleName())); + } + } + /** * Try to provide coders for as many of the type arguments of given * {@link DoFnSignature.StateDeclaration} as possible. @@ -737,6 +759,11 @@ public class ParDo { // Use coder registry to determine coders for all StateSpec defined in the fn signature. finishSpecifyingStateSpecs(fn, input.getPipeline().getCoderRegistry(), input.getCoder()); + DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); + if (signature.usesState() || signature.usesTimers()) { + validateStateApplicableForInput(fn, input); + } + PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal( input.getPipeline(), TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()), http://git-wip-us.apache.org/repos/asf/beam/blob/175ff2fe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 5b60ef3..fa4949e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -1593,6 +1593,108 @@ public class ParDoTest implements Serializable { } @Test + public void testStateNotKeyed() { + final String stateId = "foo"; + + DoFn<String, Integer> fn = + new DoFn<String, Integer>() { + + @StateId(stateId) + private final StateSpec<ValueState<Integer>> intState = + StateSpecs.value(); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(stateId) ValueState<Integer> state) {} + }; + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("state"); + thrown.expectMessage("KvCoder"); + + pipeline.apply(Create.of("hello", "goodbye", "hello again")).apply(ParDo.of(fn)); + } + + @Test + public void testStateNotDeterministic() { + final String stateId = "foo"; + + // DoubleCoder is not deterministic, so this should crash + DoFn<KV<Double, String>, Integer> fn = + new DoFn<KV<Double, String>, Integer>() { + + @StateId(stateId) + private final StateSpec<ValueState<Integer>> intState = + StateSpecs.value(); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(stateId) ValueState<Integer> state) {} + }; + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("state"); + thrown.expectMessage("deterministic"); + + pipeline + .apply(Create.of(KV.of(1.0, "hello"), KV.of(5.4, "goodbye"), KV.of(7.2, "hello again"))) + .apply(ParDo.of(fn)); + } + + @Test + public void testTimerNotKeyed() { + final String timerId = "foo"; + + DoFn<String, Integer> fn = + new DoFn<String, Integer>() { + + @TimerId(timerId) + private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement( + ProcessContext c, @TimerId(timerId) Timer timer) {} + + @OnTimer(timerId) + public void onTimer() {} + }; + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("timer"); + thrown.expectMessage("KvCoder"); + + pipeline.apply(Create.of("hello", "goodbye", "hello again")).apply(ParDo.of(fn)); + } + + @Test + public void testTimerNotDeterministic() { + final String timerId = "foo"; + + // DoubleCoder is not deterministic, so this should crash + DoFn<KV<Double, String>, Integer> fn = + new DoFn<KV<Double, String>, Integer>() { + + @TimerId(timerId) + private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement( + ProcessContext c, @TimerId(timerId) Timer timer) {} + + @OnTimer(timerId) + public void onTimer() {} + }; + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("timer"); + thrown.expectMessage("deterministic"); + + pipeline + .apply(Create.of(KV.of(1.0, "hello"), KV.of(5.4, "goodbye"), KV.of(7.2, "hello again"))) + .apply(ParDo.of(fn)); + } + + @Test @Category({ValidatesRunner.class, UsesStatefulParDo.class}) public void testValueStateCoderInference() { final String stateId = "foo";