Reviewer comments + a couple of extra fixes. All compiles.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4c002724 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4c002724 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4c002724 Branch: refs/heads/master Commit: 4c002724c1adffc6acb1a5b424f864d451e1061c Parents: 2d379dd Author: Robert Bradshaw <rober...@gmail.com> Authored: Mon May 8 10:48:40 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Mon May 8 20:17:57 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunnerTest.java | 4 ++-- .../beam/runners/dataflow/BatchViewOverrides.java | 16 ++-------------- .../beam/runners/dataflow/internal/IsmFormat.java | 16 ++-------------- .../runners/dataflow/util/CloudObjectsTest.java | 8 ++++---- .../org/apache/beam/sdk/io/FileBasedSink.java | 18 +++--------------- .../apache/beam/sdk/coders/CoderRegistryTest.java | 2 ++ .../sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 18 +++--------------- 7 files changed, 18 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4c002724/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 85e55eb..943d27c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -526,11 +526,11 @@ public class DirectRunnerTest implements Serializable { private static class LongNoDecodeCoder extends AtomicCoder<Long> { @Override public void encode( - Long value, OutputStream outStream, Context context) throws IOException { + Long value, OutputStream outStream) throws IOException { } @Override - public Long decode(InputStream inStream, Context context) throws IOException { + public Long decode(InputStream inStream) throws IOException { throw new CoderException("Cannot decode a long"); } } http://git-wip-us.apache.org/repos/asf/beam/blob/4c002724/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index d640f6e..32a04c0 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -1353,27 +1353,15 @@ class BatchViewOverrides { @Override public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream) throws CoderException, IOException { - encode(value, outStream, Coder.Context.NESTED); - } - - @Override - public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream, - Coder.Context context) throws CoderException, IOException { transformCoder.encode(value.transform, outStream); - originalMapCoder.encode(value.originalMap, outStream, context); + originalMapCoder.encode(value.originalMap, outStream); } @Override public TransformedMap<K, V1, V2> decode(InputStream inStream) throws CoderException, IOException { - return decode(inStream, Coder.Context.NESTED); - } - - @Override - public TransformedMap<K, V1, V2> decode( - InputStream inStream, Coder.Context context) throws CoderException, IOException { return new TransformedMap<>( transformCoder.decode(inStream), - originalMapCoder.decode(inStream, context)); + originalMapCoder.decode(inStream)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/4c002724/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java index 8cfae81..0796d08 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java @@ -622,32 +622,20 @@ public class IsmFormat { @Override public void encode(IsmShard value, OutputStream outStream) throws CoderException, IOException { - encode(value, outStream, Coder.Context.NESTED); - } - - @Override - public void encode(IsmShard value, OutputStream outStream, Coder.Context context) - throws CoderException, IOException { checkState(value.getIndexOffset() >= 0, "%s attempting to be written without index offset.", value); VarIntCoder.of().encode(value.getId(), outStream); VarLongCoder.of().encode(value.getBlockOffset(), outStream); - VarLongCoder.of().encode(value.getIndexOffset(), outStream, context); + VarLongCoder.of().encode(value.getIndexOffset(), outStream); } @Override public IsmShard decode(InputStream inStream) throws CoderException, IOException { - return decode(inStream, Coder.Context.NESTED); - } - - @Override - public IsmShard decode( - InputStream inStream, Coder.Context context) throws CoderException, IOException { return IsmShard.of( VarIntCoder.of().decode(inStream), VarLongCoder.of().decode(inStream), - VarLongCoder.of().decode(inStream, context)); + VarLongCoder.of().decode(inStream)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/4c002724/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java index 64c0dbd..59a5431 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java @@ -171,12 +171,12 @@ public class CloudObjectsTest { private static class ObjectCoder extends CustomCoder<Object> { @Override - public void encode(Object value, OutputStream outStream, Context context) + public void encode(Object value, OutputStream outStream) throws CoderException, IOException { } @Override - public Object decode(InputStream inStream, Context context) + public Object decode(InputStream inStream) throws CoderException, IOException { return new Object(); } @@ -197,11 +197,11 @@ public class CloudObjectsTest { */ private static class ArbitraryCoder extends StructuredCoder<Record> { @Override - public void encode(Record value, OutputStream outStream, Context context) + public void encode(Record value, OutputStream outStream) throws CoderException, IOException {} @Override - public Record decode(InputStream inStream, Context context) throws CoderException, IOException { + public Record decode(InputStream inStream) throws CoderException, IOException { return new Record(); } http://git-wip-us.apache.org/repos/asf/beam/blob/4c002724/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 3620c22..32aa9c3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -949,34 +949,22 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { @Override public void encode(FileResult value, OutputStream outStream) throws IOException { - encode(value, outStream, Context.NESTED); - } - - @Override - public void encode(FileResult value, OutputStream outStream, Context context) - throws IOException { if (value == null) { throw new CoderException("cannot encode a null value"); } stringCoder.encode(value.getFilename().toString(), outStream); if (value.getDestinationFilename() == null) { - stringCoder.encode(null, outStream, context); + stringCoder.encode(null, outStream); } else { - stringCoder.encode(value.getDestinationFilename().toString(), outStream, context); + stringCoder.encode(value.getDestinationFilename().toString(), outStream); } } @Override public FileResult decode(InputStream inStream) throws IOException { - return decode(inStream, Context.NESTED); - } - - @Override - public FileResult decode(InputStream inStream, Context context) - throws IOException { String filename = stringCoder.decode(inStream); assert filename != null; // fixes a compiler warning - @Nullable String destinationFilename = stringCoder.decode(inStream, context); + @Nullable String destinationFilename = stringCoder.decode(inStream); return new FileResult( FileSystems.matchNewResource(filename, false /* isDirectory */), destinationFilename == null http://git-wip-us.apache.org/repos/asf/beam/blob/4c002724/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java index b199a06..d1113f7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java @@ -439,8 +439,10 @@ public class CoderRegistryTest { private static class AutoRegistrationClassCoder extends CustomCoder<AutoRegistrationClass> { private static final AutoRegistrationClassCoder INSTANCE = new AutoRegistrationClassCoder(); + @Override public void encode(AutoRegistrationClass value, OutputStream outStream) {} + @Override public AutoRegistrationClass decode(InputStream inStream) { return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/4c002724/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index db8c1b7..e8fe701 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -371,28 +371,16 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub @Override public void encode(PubsubCheckpoint value, OutputStream outStream) throws IOException { - encode(value, outStream, Context.NESTED); - } - - @Override - public void encode(PubsubCheckpoint value, OutputStream outStream, Context context) - throws IOException { SUBSCRIPTION_PATH_CODER.encode( value.subscriptionPath, - outStream, - context.nested()); - LIST_CODER.encode(value.notYetReadIds, outStream, context); + outStream); + LIST_CODER.encode(value.notYetReadIds, outStream); } @Override public PubsubCheckpoint decode(InputStream inStream) throws IOException { - return decode(inStream, Context.NESTED); - } - - @Override - public PubsubCheckpoint decode(InputStream inStream, Context context) throws IOException { String path = SUBSCRIPTION_PATH_CODER.decode(inStream); - List<String> notYetReadIds = LIST_CODER.decode(inStream, context); + List<String> notYetReadIds = LIST_CODER.decode(inStream); return new PubsubCheckpoint(path, null, null, notYetReadIds); } }