This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e83270  [BEAM-6979] Fix Dataflow's handling of the new well known 
Double coder. (#8288)
0e83270 is described below

commit 0e83270608169dba8aa05d5e29107d7ee85469fa
Author: Lukasz Cwik <lc...@google.com>
AuthorDate: Fri Apr 12 10:38:39 2019 -0700

    [BEAM-6979] Fix Dataflow's handling of the new well known Double coder. 
(#8288)
---
 .../beam/runners/dataflow/util/CloudObjects.java   | 27 ++++++++++++++++++++--
 .../worker/graph/LengthPrefixUnknownCoders.java    |  3 ++-
 2 files changed, 27 insertions(+), 3 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
index 01c9383..ee78f6f 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
@@ -22,17 +22,40 @@ import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
 
 import java.util.Map;
 import java.util.ServiceLoader;
+import java.util.Set;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.core.construction.ModelCoderRegistrar;
 import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.runners.core.construction.Timer;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
 
 /** Utilities for converting an object to a {@link CloudObject}. */
 public class CloudObjects {
   private CloudObjects() {}
 
+  // All the coders the Dataflow service understands. This is a subset of all 
Beam Model coders.
+  static final Set<Class<? extends Coder>> DATAFLOW_KNOWN_CODERS =
+      ImmutableSet.of(
+          ByteArrayCoder.class,
+          KvCoder.class,
+          VarLongCoder.class,
+          IntervalWindowCoder.class,
+          IterableCoder.class,
+          Timer.Coder.class,
+          LengthPrefixCoder.class,
+          GlobalWindow.Coder.class,
+          FullWindowedValueCoder.class);
+
   static final Map<Class<? extends Coder>, CloudObjectTranslator<? extends 
Coder>>
       CODER_TRANSLATORS = populateCoderTranslators();
   static final Map<String, CloudObjectTranslator<? extends Coder>>
@@ -77,7 +100,7 @@ public class CloudObjects {
           DefaultCoderCloudObjectTranslatorRegistrar.class.getSimpleName());
       encoding = customCoderTranslator.toCloudObject(coder, sdkComponents);
     }
-    if (sdkComponents != null && !ModelCoderRegistrar.isKnownCoder(coder)) {
+    if (sdkComponents != null && 
!DATAFLOW_KNOWN_CODERS.contains(coder.getClass())) {
       try {
         String coderId = sdkComponents.registerCoder(coder);
         Structs.addString(encoding, PropertyNames.PIPELINE_PROTO_CODER_ID, 
coderId);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
index 1ad7835..d4b9e6c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
@@ -71,7 +71,8 @@ public class LengthPrefixUnknownCoders {
           "kind:fixed_big_endian_int32",
           "kind:fixed_big_endian_int64",
           "kind:var_int32",
-          "kind:void");
+          "kind:void",
+          "org.apache.beam.sdk.coders.DoubleCoder");
 
   private static final String LENGTH_PREFIX_CODER_TYPE = "kind:length_prefix";
 

Reply via email to