Try PTransform-based coder inference before using fallback coder.

This is particularly important for fallback coders that claim
to provide a coder for Object (or equivalently an unconstrained
type parameter).  See BEAM-1642.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/289d2dec
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/289d2dec
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/289d2dec

Branch: refs/heads/master
Commit: 289d2decba57ff990f44567d1a16c189cfbd1cc8
Parents: 9b6b906
Author: Robert Bradshaw <rober...@gmail.com>
Authored: Fri Apr 28 15:53:42 2017 -0700
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Wed May 3 14:36:03 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunnerTest.java   | 10 ++++++++++
 .../org/apache/beam/sdk/values/TypedPValue.java | 20 ++++++++++----------
 2 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/289d2dec/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 6f9adc4..83881fc 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -578,4 +578,14 @@ public class DirectRunnerTest implements Serializable {
       return underlying.getDefaultOutputCoder();
     }
   }
+
+  @Test
+  public void fallbackCoderProviderAllowsInference() {
+    // See https://issues.apache.org/jira/browse/BEAM-1642
+    Pipeline p = getPipeline();
+    p.getCoderRegistry().setFallbackCoderProvider(
+        org.apache.beam.sdk.coders.AvroCoder.PROVIDER);
+    p.apply(Create.of(Arrays.asList(100, 
200))).apply(Count.<Integer>globally());
+    p.run().waitUntilFinish();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/289d2dec/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
index 54af747..f473776 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
@@ -140,7 +140,16 @@ public abstract class TypedPValue<T> extends PValueBase 
implements PValue {
       return coderOrFailure;
     }
 
-    // Second option for a coder: Look in the coder registry.
+    // Second option for a coder: use the default Coder from the producing 
PTransform.
+    CannotProvideCoderException inputCoderException;
+    try {
+      return new CoderOrFailure<>(
+          ((PTransform) transform).getDefaultOutputCoder(input, this), null);
+    } catch (CannotProvideCoderException exc) {
+      inputCoderException = exc;
+    }
+
+    // Third option for a coder: Look in the coder registry.
     TypeDescriptor<T> token = getTypeDescriptor();
     CannotProvideCoderException inferFromTokenException = null;
     if (token != null) {
@@ -162,15 +171,6 @@ public abstract class TypedPValue<T> extends PValueBase 
implements PValue {
       }
     }
 
-    // Third option for a coder: use the default Coder from the producing 
PTransform.
-    CannotProvideCoderException inputCoderException;
-    try {
-      return new CoderOrFailure<>(
-          ((PTransform) transform).getDefaultOutputCoder(input, this), null);
-    } catch (CannotProvideCoderException exc) {
-      inputCoderException = exc;
-    }
-
     // Build up the error message and list of causes.
     StringBuilder messageBuilder = new StringBuilder()
         .append("Unable to return a default Coder for ").append(this)

Reply via email to