This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 0e83270 [BEAM-6979] Fix Dataflow's handling of the new well known Double coder. (#8288) 0e83270 is described below commit 0e83270608169dba8aa05d5e29107d7ee85469fa Author: Lukasz Cwik <lc...@google.com> AuthorDate: Fri Apr 12 10:38:39 2019 -0700 [BEAM-6979] Fix Dataflow's handling of the new well known Double coder. (#8288) --- .../beam/runners/dataflow/util/CloudObjects.java | 27 ++++++++++++++++++++-- .../worker/graph/LengthPrefixUnknownCoders.java | 3 ++- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java index 01c9383..ee78f6f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java @@ -22,17 +22,40 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi import java.util.Map; import java.util.ServiceLoader; +import java.util.Set; import javax.annotation.Nullable; -import org.apache.beam.runners.core.construction.ModelCoderRegistrar; import org.apache.beam.runners.core.construction.SdkComponents; +import org.apache.beam.runners.core.construction.Timer; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet; /** Utilities for converting an object to a {@link CloudObject}. */ public class CloudObjects { private CloudObjects() {} + // All the coders the Dataflow service understands. This is a subset of all Beam Model coders. + static final Set<Class<? extends Coder>> DATAFLOW_KNOWN_CODERS = + ImmutableSet.of( + ByteArrayCoder.class, + KvCoder.class, + VarLongCoder.class, + IntervalWindowCoder.class, + IterableCoder.class, + Timer.Coder.class, + LengthPrefixCoder.class, + GlobalWindow.Coder.class, + FullWindowedValueCoder.class); + static final Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>> CODER_TRANSLATORS = populateCoderTranslators(); static final Map<String, CloudObjectTranslator<? extends Coder>> @@ -77,7 +100,7 @@ public class CloudObjects { DefaultCoderCloudObjectTranslatorRegistrar.class.getSimpleName()); encoding = customCoderTranslator.toCloudObject(coder, sdkComponents); } - if (sdkComponents != null && !ModelCoderRegistrar.isKnownCoder(coder)) { + if (sdkComponents != null && !DATAFLOW_KNOWN_CODERS.contains(coder.getClass())) { try { String coderId = sdkComponents.registerCoder(coder); Structs.addString(encoding, PropertyNames.PIPELINE_PROTO_CODER_ID, coderId); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java index 1ad7835..d4b9e6c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java @@ -71,7 +71,8 @@ public class LengthPrefixUnknownCoders { "kind:fixed_big_endian_int32", "kind:fixed_big_endian_int64", "kind:var_int32", - "kind:void"); + "kind:void", + "org.apache.beam.sdk.coders.DoubleCoder"); private static final String LENGTH_PREFIX_CODER_TYPE = "kind:length_prefix";