This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new c4dbe12 [BEAM-7260] UTF8 coder is breaking dataflow tests new da4bb49 Merge pull request #8545 from ihji/BEAM-7260 c4dbe12 is described below commit c4dbe12a09e93d0e91d2c7c3bcd3e9bb9117310c Author: Heejong Lee <heej...@gmail.com> AuthorDate: Thu May 9 17:22:38 2019 -0700 [BEAM-7260] UTF8 coder is breaking dataflow tests StringUtf8Coder is now ModelCoder in JavaSDK. We need to add it to WELL_KNOWN_CODER_TYPES for dataflow worker harness as well. --- .../dataflow/worker/graph/LengthPrefixUnknownCoders.java | 3 ++- .../worker/graph/LengthPrefixUnknownCodersTest.java | 13 +++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java index d4b9e6c..1537288 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java @@ -72,7 +72,8 @@ public class LengthPrefixUnknownCoders { "kind:fixed_big_endian_int64", "kind:var_int32", "kind:void", - "org.apache.beam.sdk.coders.DoubleCoder"); + "org.apache.beam.sdk.coders.DoubleCoder", + "org.apache.beam.sdk.coders.StringUtf8Coder"); private static final String LENGTH_PREFIX_CODER_TYPE = "kind:length_prefix"; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java index f73538a..679a292 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java @@ -77,14 +77,13 @@ public class LengthPrefixUnknownCodersTest { private static final Coder<WindowedValue<KV<String, Integer>>> prefixedWindowedValueCoder = WindowedValue.getFullCoder( - KvCoder.of( - LengthPrefixCoder.of(StringUtf8Coder.of()), LengthPrefixCoder.of(VarIntCoder.of())), + KvCoder.of(StringUtf8Coder.of(), LengthPrefixCoder.of(VarIntCoder.of())), GlobalWindow.Coder.INSTANCE); - private static final Coder<WindowedValue<KV<byte[], byte[]>>> + private static final Coder<WindowedValue<KV<String, byte[]>>> prefixedAndReplacedWindowedValueCoder = WindowedValue.getFullCoder( - KvCoder.of(LENGTH_PREFIXED_BYTE_ARRAY_CODER, LENGTH_PREFIXED_BYTE_ARRAY_CODER), + KvCoder.of(StringUtf8Coder.of(), LENGTH_PREFIXED_BYTE_ARRAY_CODER), GlobalWindow.Coder.INSTANCE); private static final String MERGE_BUCKETS_DO_FN = "MergeBucketsDoFn"; @@ -124,8 +123,7 @@ public class LengthPrefixUnknownCodersTest { Coder<WindowedValue<KV<String, Integer>>> expectedCoder = WindowedValue.getFullCoder( - KvCoder.of( - LengthPrefixCoder.of(StringUtf8Coder.of()), LengthPrefixCoder.of(VarIntCoder.of())), + KvCoder.of(StringUtf8Coder.of(), LengthPrefixCoder.of(VarIntCoder.of())), GlobalWindow.Coder.INSTANCE); assertEquals( @@ -138,8 +136,7 @@ public class LengthPrefixUnknownCodersTest { public void testLengthPrefixAndReplaceUnknownCoder() throws Exception { Coder<WindowedValue<KV<String, Integer>>> windowedValueCoder = WindowedValue.getFullCoder( - KvCoder.of(LengthPrefixCoder.of(StringUtf8Coder.of()), VarIntCoder.of()), - GlobalWindow.Coder.INSTANCE); + KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), GlobalWindow.Coder.INSTANCE); Map<String, Object> lengthPrefixedCoderCloudObject = forCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null), true);