Try PTransform-based coder inference before using fallback coder. This is particularly important for fallback coders that claim to provide a coder for Object (or equivalently an unconstrained type parameter). See BEAM-1642.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/289d2dec Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/289d2dec Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/289d2dec Branch: refs/heads/master Commit: 289d2decba57ff990f44567d1a16c189cfbd1cc8 Parents: 9b6b906 Author: Robert Bradshaw <rober...@gmail.com> Authored: Fri Apr 28 15:53:42 2017 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Wed May 3 14:36:03 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunnerTest.java | 10 ++++++++++ .../org/apache/beam/sdk/values/TypedPValue.java | 20 ++++++++++---------- 2 files changed, 20 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/289d2dec/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 6f9adc4..83881fc 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -578,4 +578,14 @@ public class DirectRunnerTest implements Serializable { return underlying.getDefaultOutputCoder(); } } + + @Test + public void fallbackCoderProviderAllowsInference() { + // See https://issues.apache.org/jira/browse/BEAM-1642 + Pipeline p = getPipeline(); + p.getCoderRegistry().setFallbackCoderProvider( + org.apache.beam.sdk.coders.AvroCoder.PROVIDER); + p.apply(Create.of(Arrays.asList(100, 200))).apply(Count.<Integer>globally()); + p.run().waitUntilFinish(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/289d2dec/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java index 54af747..f473776 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java @@ -140,7 +140,16 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue { return coderOrFailure; } - // Second option for a coder: Look in the coder registry. + // Second option for a coder: use the default Coder from the producing PTransform. + CannotProvideCoderException inputCoderException; + try { + return new CoderOrFailure<>( + ((PTransform) transform).getDefaultOutputCoder(input, this), null); + } catch (CannotProvideCoderException exc) { + inputCoderException = exc; + } + + // Third option for a coder: Look in the coder registry. TypeDescriptor<T> token = getTypeDescriptor(); CannotProvideCoderException inferFromTokenException = null; if (token != null) { @@ -162,15 +171,6 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue { } } - // Third option for a coder: use the default Coder from the producing PTransform. - CannotProvideCoderException inputCoderException; - try { - return new CoderOrFailure<>( - ((PTransform) transform).getDefaultOutputCoder(input, this), null); - } catch (CannotProvideCoderException exc) { - inputCoderException = exc; - } - // Build up the error message and list of causes. StringBuilder messageBuilder = new StringBuilder() .append("Unable to return a default Coder for ").append(this)