http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java index dfd4ea2..13a7261 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java @@ -48,13 +48,13 @@ public class CustomCoderTest { } @Override - public void encode(KV<String, Long> kv, OutputStream out, Context context) + public void encode(KV<String, Long> kv, OutputStream out) throws IOException { new DataOutputStream(out).writeLong(kv.getValue()); } @Override - public KV<String, Long> decode(InputStream inStream, Context context) + public KV<String, Long> decode(InputStream inStream) throws IOException { return KV.of(key, new DataInputStream(inStream).readLong()); }
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java index d6d7de8..9fb0b82 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java @@ -167,6 +167,12 @@ public class NullableCoderTest { private static class EntireStreamExpectingCoder extends AtomicCoder<String> { @Override + public void encode(String value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode( String value, OutputStream outStream, Context context) throws IOException { checkArgument(context.isWholeStream, "Expected to get entire stream"); @@ -174,6 +180,11 @@ public class NullableCoderTest { } @Override + public String decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public String decode(InputStream inStream, Context context) throws CoderException, IOException { checkArgument(context.isWholeStream, "Expected to get entire stream"); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java index af2c94e..7aa2080 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java @@ -47,7 +47,7 @@ public class StructuredCoderTest { private static final long serialVersionUID = 0L; @Override - public void encode(@Nullable Boolean value, OutputStream outStream, Context context) + public void encode(@Nullable Boolean value, OutputStream outStream) throws CoderException, IOException { if (value == null) { outStream.write(2); @@ -61,7 +61,7 @@ public class StructuredCoderTest { @Override @Nullable public Boolean decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + InputStream inStream) throws CoderException, IOException { int value = inStream.read(); if (value == 0) { @@ -110,7 +110,7 @@ public class StructuredCoderTest { @Override public void encode( - @Nullable ObjectIdentityBoolean value, OutputStream outStream, Context context) + @Nullable ObjectIdentityBoolean value, OutputStream outStream) throws CoderException, IOException { if (value == null) { outStream.write(2); @@ -124,7 +124,7 @@ public class StructuredCoderTest { @Override @Nullable public ObjectIdentityBoolean decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + InputStream inStream) throws CoderException, IOException { int value = inStream.read(); if (value == 0) { @@ -213,13 +213,13 @@ public class StructuredCoderTest { private static class Foo<T> extends StructuredCoder<T> { @Override - public void encode(T value, OutputStream outStream, Coder.Context context) + public void encode(T value, OutputStream outStream) throws CoderException, IOException { throw new UnsupportedOperationException(); } @Override - public T decode(InputStream inStream, Coder.Context context) + public T decode(InputStream inStream) throws CoderException, IOException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index 83f348c..37db4ef 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -98,12 +98,12 @@ public class PAssertTest implements Serializable { } @Override - public void encode(NotSerializableObject value, OutputStream outStream, Context context) + public void encode(NotSerializableObject value, OutputStream outStream) throws CoderException, IOException { } @Override - public NotSerializableObject decode(InputStream inStream, Context context) + public NotSerializableObject decode(InputStream inStream) throws CoderException, IOException { return new NotSerializableObject(); } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java index db5ff2e..375be33 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java @@ -153,11 +153,11 @@ public class SerializableMatchersTest implements Serializable { private static class NotSerializableClassCoder extends AtomicCoder<NotSerializableClass> { @Override - public void encode(NotSerializableClass value, OutputStream outStream, Coder.Context context) { + public void encode(NotSerializableClass value, OutputStream outStream) { } @Override - public NotSerializableClass decode(InputStream inStream, Coder.Context context) { + public NotSerializableClass decode(InputStream inStream) { return new NotSerializableClass(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java index 546683b..3939800 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java @@ -75,14 +75,14 @@ public class WindowSupplierTest { private static class FailingCoder extends AtomicCoder<BoundedWindow> { @Override public void encode( - BoundedWindow value, OutputStream outStream, Context context) + BoundedWindow value, OutputStream outStream) throws CoderException, IOException { throw new CoderException("Test Encode Exception"); } @Override public BoundedWindow decode( - InputStream inStream, Context context) throws CoderException, IOException { + InputStream inStream) throws CoderException, IOException { throw new CoderException("Test Decode Exception"); } } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java index 8a4d563..33c652a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java @@ -336,12 +336,23 @@ public class CombineFnsTest { private static final UserStringCoder INSTANCE = new UserStringCoder(); @Override + public void encode(UserString value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(UserString value, OutputStream outStream, Context context) throws CoderException, IOException { StringUtf8Coder.of().encode(value.strValue, outStream, context); } @Override + public UserString decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public UserString decode(InputStream inStream, Context context) throws CoderException, IOException { return UserString.of(StringUtf8Coder.of().decode(inStream, context)); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index e4b016b..bd8aee4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -876,6 +876,12 @@ public class CombineTest implements Serializable { */ private class CountSumCoder extends AtomicCoder<CountSum> { @Override + public void encode(CountSum value, OutputStream outStream, OutputStream outStream) + throws CoderException, IOException { + encode(outStream, outStream, Context.NESTED); + } + + @Override public void encode(CountSum value, OutputStream outStream, Context context) throws CoderException, IOException { LONG_CODER.encode(value.count, outStream); @@ -883,6 +889,11 @@ public class CombineTest implements Serializable { } @Override + public CountSum decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Coder.Context.NESTED); + } + + @Override public CountSum decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { long count = LONG_CODER.decode(inStream); @@ -925,12 +936,23 @@ public class CombineTest implements Serializable { public static Coder<Accumulator> getCoder() { return new AtomicCoder<Accumulator>() { @Override + public void encode(Accumulator accumulator, OutputStream outStream) + throws CoderException, IOException { + encode(accumulator, outStream, Coder.Context.NESTED); + } + + @Override public void encode(Accumulator accumulator, OutputStream outStream, Coder.Context context) throws CoderException, IOException { StringUtf8Coder.of().encode(accumulator.value, outStream, context); } @Override + public Accumulator decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Coder.Context.NESTED); + } + + @Override public Accumulator decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { return new Accumulator(StringUtf8Coder.of().decode(inStream, context)); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java index 7e8a1dd..a05d31c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java @@ -134,11 +134,11 @@ public class CreateTest { private static class RecordCoder extends AtomicCoder<Record> { @Override - public void encode(Record value, OutputStream outStream, Context context) + public void encode(Record value, OutputStream outStream) throws CoderException, IOException {} @Override - public Record decode(InputStream inStream, Context context) throws CoderException, IOException { + public Record decode(InputStream inStream) throws CoderException, IOException { return null; } } @@ -207,15 +207,14 @@ public class CreateTest { @Override public void encode( UnserializableRecord value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) + OutputStream outStream) throws CoderException, IOException { stringCoder.encode(value.myString, outStream); } @Override public UnserializableRecord decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + InputStream inStream) throws CoderException, IOException { return new UnserializableRecord(stringCoder.decode(inStream)); } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index aba33eb..0cd885c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -469,13 +469,13 @@ public class GroupByKeyTest { private DeterministicKeyCoder() {} @Override - public void encode(BadEqualityKey value, OutputStream outStream, Context context) + public void encode(BadEqualityKey value, OutputStream outStream) throws IOException { new DataOutputStream(outStream).writeLong(value.key); } @Override - public BadEqualityKey decode(InputStream inStream, Context context) + public BadEqualityKey decode(InputStream inStream) throws IOException { return new BadEqualityKey(new DataInputStream(inStream).readLong()); } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index d2cb980..3697211 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -986,12 +986,12 @@ public class ParDoTest implements Serializable { } @Override - public void encode(TestDummy value, OutputStream outStream, Context context) + public void encode(TestDummy value, OutputStream outStream) throws CoderException, IOException { } @Override - public TestDummy decode(InputStream inStream, Context context) + public TestDummy decode(InputStream inStream) throws CoderException, IOException { return new TestDummy(); } @@ -1090,12 +1090,23 @@ public class ParDoTest implements Serializable { } @Override + public void encode(MyInteger value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(MyInteger value, OutputStream outStream, Context context) throws CoderException, IOException { delegate.encode(value.getValue(), outStream, context); } @Override + public MyInteger decode(InputStream inStream) throws CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public MyInteger decode(InputStream inStream, Context context) throws CoderException, IOException { return new MyInteger(delegate.decode(inStream, context)); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java index 84f3d69..cdd03d9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java @@ -507,12 +507,23 @@ public class ViewTest implements Serializable { private static class NonDeterministicStringCoder extends AtomicCoder<String> { @Override + public void encode(String value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Coder.Context.NESTED); + } + + @Override public void encode(String value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { StringUtf8Coder.of().encode(value, outStream, context); } @Override + public String decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Coder.Context.NESTED); + } + + @Override public String decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { return StringUtf8Coder.of().decode(inStream, context); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 489493a..a8cd35e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -277,10 +277,10 @@ public class DoFnInvokersTest { } @Override - public void encode(SomeRestriction value, OutputStream outStream, Context context) {} + public void encode(SomeRestriction value, OutputStream outStream) {} @Override - public SomeRestriction decode(InputStream inStream, Context context) { + public SomeRestriction decode(InputStream inStream) { return null; } } @@ -400,10 +400,10 @@ public class DoFnInvokersTest { @Override public void encode( - RestrictionWithDefaultTracker value, OutputStream outStream, Context context) {} + RestrictionWithDefaultTracker value, OutputStream outStream) {} @Override - public RestrictionWithDefaultTracker decode(InputStream inStream, Context context) { + public RestrictionWithDefaultTracker decode(InputStream inStream) { return null; } } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java index 7230a8b..f36e5e1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java @@ -50,12 +50,12 @@ public class CoderUtilsTest { } @Override - public void encode(Integer value, OutputStream outStream, Context context) { + public void encode(Integer value, OutputStream outStream) { throw new RuntimeException("not expecting to be called"); } @Override - public Integer decode(InputStream inStream, Context context) { + public Integer decode(InputStream inStream) { throw new RuntimeException("not expecting to be called"); } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java index 6ba1d4a..9a80730 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java @@ -89,12 +89,12 @@ public class SerializableUtilsTest { private final Object unserializableField = new Object(); @Override - public void encode(Object value, OutputStream outStream, Context context) + public void encode(Object value, OutputStream outStream) throws CoderException, IOException { } @Override - public Object decode(InputStream inStream, Context context) + public Object decode(InputStream inStream) throws CoderException, IOException { return unserializableField; } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java index 325c69d..73c7977 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java @@ -49,6 +49,12 @@ public class ByteStringCoder extends AtomicCoder<ByteString> { private ByteStringCoder() {} @Override + public void encode(ByteString value, OutputStream outStream) + throws IOException, CoderException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(ByteString value, OutputStream outStream, Context context) throws IOException, CoderException { if (value == null) { @@ -63,6 +69,11 @@ public class ByteStringCoder extends AtomicCoder<ByteString> { } @Override + public ByteString decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public ByteString decode(InputStream inStream, Context context) throws IOException { if (context.isWholeStream) { return ByteString.readFrom(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java index 968a2fa..f73bf2b 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java @@ -168,6 +168,12 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> { } @Override + public void encode(T value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(T value, OutputStream outStream, Context context) throws IOException { if (value == null) { throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName()); @@ -180,6 +186,11 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> { } @Override + public T decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public T decode(InputStream inStream, Context context) throws IOException { if (context.isWholeStream) { return getParser().parseFrom(inStream, getExtensionRegistry()); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java index 33b9f77..f034a03 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java @@ -38,7 +38,7 @@ public class TableDestinationCoder extends AtomicCoder<TableDestination> { } @Override - public void encode(TableDestination value, OutputStream outStream, Context context) + public void encode(TableDestination value, OutputStream outStream) throws IOException { if (value == null) { throw new CoderException("cannot encode a null value"); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java index 8ae75c5..c4707da 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java @@ -38,6 +38,12 @@ class TableRowInfoCoder extends AtomicCoder<TableRowInfo> { } @Override + public void encode(TableRowInfo value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(TableRowInfo value, OutputStream outStream, Context context) throws IOException { if (value == null) { @@ -48,6 +54,11 @@ class TableRowInfoCoder extends AtomicCoder<TableRowInfo> { } @Override + public TableRowInfo decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public TableRowInfo decode(InputStream inStream, Context context) throws IOException { return new TableRowInfo( http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java index cfec991..e4b6f1f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java @@ -38,6 +38,12 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> { } @Override + public void encode(TableRow value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(TableRow value, OutputStream outStream, Context context) throws IOException { String strValue = MAPPER.writeValueAsString(value); @@ -45,6 +51,11 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> { } @Override + public TableRow decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public TableRow decode(InputStream inStream, Context context) throws IOException { String strValue = StringUtf8Coder.of().decode(inStream, context); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index 9e83271..f014039 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -101,7 +101,7 @@ class WriteBundlesToFiles<DestinationT> } @Override - public void encode(Result<DestinationT> value, OutputStream outStream, Context context) + public void encode(Result<DestinationT> value, OutputStream outStream) throws IOException { if (value == null) { throw new CoderException("cannot encode a null value"); @@ -112,7 +112,7 @@ class WriteBundlesToFiles<DestinationT> } @Override - public Result<DestinationT> decode(InputStream inStream, Context context) throws IOException { + public Result<DestinationT> decode(InputStream inStream) throws IOException { String filename = stringCoder.decode(inStream); long fileByteSize = longCoder.decode(inStream); DestinationT destination = destinationCoder.decode(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java index d120f72..5df2bcf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java @@ -34,12 +34,23 @@ public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubMessage> { } @Override + public void encode(PubsubMessage value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(PubsubMessage value, OutputStream outStream, Context context) throws IOException { PAYLOAD_CODER.encode(value.getPayload(), outStream, context); } @Override + public PubsubMessage decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public PubsubMessage decode(InputStream inStream, Context context) throws IOException { return new PubsubMessage( PAYLOAD_CODER.decode(inStream, context), ImmutableMap.<String, String>of()); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java index 5907c9e..bcf7656 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java @@ -45,6 +45,12 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage> return new PubsubMessageWithAttributesCoder(); } + @Override + public void encode(PubsubMessage value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + public void encode(PubsubMessage value, OutputStream outStream, Context context) throws IOException { PAYLOAD_CODER.encode(value.getPayload(), outStream); @@ -52,6 +58,11 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage> } @Override + public PubsubMessage decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public PubsubMessage decode(InputStream inStream, Context context) throws IOException { byte[] payload = PAYLOAD_CODER.decode(inStream); Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index ae320c7..ad38e28 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -108,7 +108,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, @Override public void encode( - OutgoingMessage value, OutputStream outStream, Context context) + OutgoingMessage value, OutputStream outStream) throws CoderException, IOException { ByteArrayCoder.of().encode(value.elementBytes, outStream); ATTRIBUTES_CODER.encode(value.attributes, outStream); @@ -118,7 +118,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, @Override public OutgoingMessage decode( - InputStream inStream, Context context) throws CoderException, IOException { + InputStream inStream) throws CoderException, IOException { byte[] elementBytes = ByteArrayCoder.of().decode(inStream); Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream); long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index e53976e..db8c1b7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -369,6 +369,12 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub private PubsubCheckpointCoder() {} @Override + public void encode(PubsubCheckpoint value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(PubsubCheckpoint value, OutputStream outStream, Context context) throws IOException { SUBSCRIPTION_PATH_CODER.encode( @@ -379,6 +385,11 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub } @Override + public PubsubCheckpoint decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public PubsubCheckpoint decode(InputStream inStream, Context context) throws IOException { String path = SUBSCRIPTION_PATH_CODER.decode(inStream); List<String> notYetReadIds = LIST_CODER.decode(inStream, context); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index d60c721..70d5377 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -748,12 +748,23 @@ public class BigQueryIOTest implements Serializable { */ private static class PartitionedGlobalWindowCoder extends AtomicCoder<PartitionedGlobalWindow> { @Override + public void encode(PartitionedGlobalWindow window, OutputStream outStream) + throws IOException, CoderException { + encode(window, outStream, Context.NESTED); + } + + @Override public void encode(PartitionedGlobalWindow window, OutputStream outStream, Context context) throws IOException, CoderException { StringUtf8Coder.of().encode(window.value, outStream, context); } @Override + public PartitionedGlobalWindow decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public PartitionedGlobalWindow decode(InputStream inStream, Context context) throws IOException, CoderException { return new PartitionedGlobalWindow(StringUtf8Coder.of().decode(inStream, context)); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java index 8fddfe0..8d2598a 100644 --- a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java +++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java @@ -68,13 +68,13 @@ public class WritableCoder<T extends Writable> extends CustomCoder<T> { } @Override - public void encode(T value, OutputStream outStream, Context context) throws IOException { + public void encode(T value, OutputStream outStream) throws IOException { value.write(new DataOutputStream(outStream)); } @SuppressWarnings("unchecked") @Override - public T decode(InputStream inStream, Context context) throws IOException { + public T decode(InputStream inStream) throws IOException { try { if (type == NullWritable.class) { // NullWritable has no default constructor http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java index 7cc043c..501fe09 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java @@ -44,16 +44,14 @@ class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable { } @Override - public void encode(Mutation mutation, OutputStream outStream, - Coder.Context context) throws IOException { + public void encode(Mutation mutation, OutputStream outStream) throws IOException { MutationType type = getType(mutation); MutationProto proto = ProtobufUtil.toMutation(type, mutation); proto.writeDelimitedTo(outStream); } @Override - public Mutation decode(InputStream inStream, - Coder.Context context) throws IOException { + public Mutation decode(InputStream inStream) throws IOException { return ProtobufUtil.toMutation(MutationProto.parseDelimitedFrom(inStream)); } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java index 24a5f7f..1d06635 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java @@ -41,13 +41,13 @@ class HBaseResultCoder extends AtomicCoder<Result> implements Serializable { } @Override - public void encode(Result value, OutputStream outputStream, Coder.Context context) + public void encode(Result value, OutputStream outputStream) throws IOException { ProtobufUtil.toResult(value).writeDelimitedTo(outputStream); } @Override - public Result decode(InputStream inputStream, Coder.Context context) + public Result decode(InputStream inputStream) throws IOException { return ProtobufUtil.toResult(ClientProtos.Result.parseDelimitedFrom(inputStream)); } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index ba84c2a..e21945f 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1597,13 +1597,13 @@ public class KafkaIO { private static class NullOnlyCoder<T> extends AtomicCoder<T> { @Override - public void encode(T value, OutputStream outStream, Context context) { + public void encode(T value, OutputStream outStream) { checkArgument(value == null, "Can only encode nulls"); // Encode as no bytes. } @Override - public T decode(InputStream inStream, Context context) { + public T decode(InputStream inStream) { return null; } } http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java index d838a0d..1971060 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java @@ -50,6 +50,12 @@ public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K, V>> { } @Override + public void encode(KafkaRecord<K, V> value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(KafkaRecord<K, V> value, OutputStream outStream, Context context) throws CoderException, IOException { Context nested = context.nested(); @@ -61,6 +67,11 @@ public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K, V>> { } @Override + public KafkaRecord<K, V> decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public KafkaRecord<K, V> decode(InputStream inStream, Context context) throws CoderException, IOException { Context nested = context.nested(); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java index c6a0174..f233e27 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java @@ -43,7 +43,7 @@ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> { } @Override - public void encode(KinesisRecord value, OutputStream outStream, Context context) throws + public void encode(KinesisRecord value, OutputStream outStream) throws IOException { BYTE_ARRAY_CODER.encode(value.getData().array(), outStream); STRING_CODER.encode(value.getSequenceNumber(), outStream); @@ -56,7 +56,7 @@ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> { } @Override - public KinesisRecord decode(InputStream inStream, Context context) throws IOException { + public KinesisRecord decode(InputStream inStream) throws IOException { ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream)); String sequenceNumber = STRING_CODER.decode(inStream); String partitionKey = STRING_CODER.decode(inStream); http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java index 5b2ec02..d4c0440 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java @@ -88,15 +88,8 @@ public class JAXBCoder<T> extends CustomCoder<T> { } @Override - public void encode(T value, OutputStream outStream) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try { - jaxbMarshaller.get().marshal(value, baos); - } catch (JAXBException e) { - throw new CoderException(e); - } - VarInt.encode(baos.size(), outStream); - baos.writeTo(outStream); + public void encode(T value, OutputStream outStream) throws CoderException, IOException { + encode(value, outStream, Context.NESTED); } @Override @@ -109,11 +102,23 @@ public class JAXBCoder<T> extends CustomCoder<T> { throw new CoderException(e); } } else { - encode(value, outStream); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + jaxbMarshaller.get().marshal(value, baos); + } catch (JAXBException e) { + throw new CoderException(e); + } + VarInt.encode(baos.size(), outStream); + baos.writeTo(outStream); } } @Override + public T decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public T decode(InputStream inStream, Context context) throws CoderException, IOException { try { if (!context.isWholeStream) { http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java index 5386a61..c175e4a 100644 --- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java @@ -178,18 +178,27 @@ public class JAXBCoderTest { } @Override + public void encode(TestType value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(TestType value, OutputStream outStream, Context context) throws CoderException, IOException { - Context nestedContext = context.nested(); VarIntCoder.of().encode(3, outStream); jaxbCoder.encode(value, outStream); VarLongCoder.of().encode(22L, outStream, context); } @Override + public TestType decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public TestType decode(InputStream inStream, Context context) throws CoderException, IOException { - Context nestedContext = context.nested(); VarIntCoder.of().decode(inStream); TestType result = jaxbCoder.decode(inStream); VarLongCoder.of().decode(inStream, context);