Rename ReadTranslator to ReadTranslation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b35e91d4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b35e91d4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b35e91d4 Branch: refs/heads/master Commit: b35e91d4ed99b74d37a08a1385018b4ca326b3a0 Parents: bc4f44f Author: Kenneth Knowles <k...@google.com> Authored: Tue May 23 15:30:22 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Tue May 23 15:53:41 2017 -0700 ---------------------------------------------------------------------- .../core/construction/ReadTranslation.java | 127 +++++++++++++ .../core/construction/ReadTranslator.java | 127 ------------- .../core/construction/ReadTranslationTest.java | 179 +++++++++++++++++++ .../core/construction/ReadTranslatorTest.java | 179 ------------------- 4 files changed, 306 insertions(+), 306 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b35e91d4/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java new file mode 100644 index 0000000..d6c3400 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.IsBounded; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.SerializableUtils; + +/** + * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded} + * {@link PTransform PTransforms} into {@link ReadPayload} protos. + */ +public class ReadTranslation { + private static final String JAVA_SERIALIZED_BOUNDED_SOURCE = "urn:beam:java:boundedsource:v1"; + private static final String JAVA_SERIALIZED_UNBOUNDED_SOURCE = "urn:beam:java:unboundedsource:v1"; + + public static ReadPayload toProto(Read.Bounded<?> read) { + return ReadPayload.newBuilder() + .setIsBounded(IsBounded.BOUNDED) + .setSource(toProto(read.getSource())) + .build(); + } + + public static ReadPayload toProto(Read.Unbounded<?> read) { + return ReadPayload.newBuilder() + .setIsBounded(IsBounded.UNBOUNDED) + .setSource(toProto(read.getSource())) + .build(); + } + + public static SdkFunctionSpec toProto(Source<?> source) { + if (source instanceof BoundedSource) { + return toProto((BoundedSource) source); + } else if (source instanceof UnboundedSource) { + return toProto((UnboundedSource<?, ?>) source); + } else { + throw new IllegalArgumentException( + String.format("Unknown %s type %s", Source.class.getSimpleName(), source.getClass())); + } + } + + private static SdkFunctionSpec toProto(BoundedSource<?> source) { + return SdkFunctionSpec.newBuilder() + .setSpec( + FunctionSpec.newBuilder() + .setUrn(JAVA_SERIALIZED_BOUNDED_SOURCE) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom(SerializableUtils.serializeToByteArray(source))) + .build()))) + .build(); + } + + public static BoundedSource<?> boundedSourceFromProto(ReadPayload payload) + throws InvalidProtocolBufferException { + checkArgument(payload.getIsBounded().equals(IsBounded.BOUNDED)); + return (BoundedSource<?>) SerializableUtils.deserializeFromByteArray( + payload + .getSource() + .getSpec() + .getParameter() + .unpack(BytesValue.class) + .getValue() + .toByteArray(), + "BoundedSource"); + } + + private static SdkFunctionSpec toProto(UnboundedSource<?, ?> source) { + return SdkFunctionSpec.newBuilder() + .setSpec( + FunctionSpec.newBuilder() + .setUrn(JAVA_SERIALIZED_UNBOUNDED_SOURCE) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom(SerializableUtils.serializeToByteArray(source))) + .build()))) + .build(); + } + + public static UnboundedSource<?, ?> unboundedSourceFromProto(ReadPayload payload) + throws InvalidProtocolBufferException { + checkArgument(payload.getIsBounded().equals(IsBounded.UNBOUNDED)); + return (UnboundedSource<?, ?>) SerializableUtils.deserializeFromByteArray( + payload + .getSource() + .getSpec() + .getParameter() + .unpack(BytesValue.class) + .getValue() + .toByteArray(), + "BoundedSource"); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/b35e91d4/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java deleted file mode 100644 index f944938..0000000 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslator.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core.construction; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.IsBounded; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.Source; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.SerializableUtils; - -/** - * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded} - * {@link PTransform PTransforms} into {@link ReadPayload} protos. - */ -public class ReadTranslator { - private static final String JAVA_SERIALIZED_BOUNDED_SOURCE = "urn:beam:java:boundedsource:v1"; - private static final String JAVA_SERIALIZED_UNBOUNDED_SOURCE = "urn:beam:java:unboundedsource:v1"; - - public static ReadPayload toProto(Read.Bounded<?> read) { - return ReadPayload.newBuilder() - .setIsBounded(IsBounded.BOUNDED) - .setSource(toProto(read.getSource())) - .build(); - } - - public static ReadPayload toProto(Read.Unbounded<?> read) { - return ReadPayload.newBuilder() - .setIsBounded(IsBounded.UNBOUNDED) - .setSource(toProto(read.getSource())) - .build(); - } - - public static SdkFunctionSpec toProto(Source<?> source) { - if (source instanceof BoundedSource) { - return toProto((BoundedSource) source); - } else if (source instanceof UnboundedSource) { - return toProto((UnboundedSource<?, ?>) source); - } else { - throw new IllegalArgumentException( - String.format("Unknown %s type %s", Source.class.getSimpleName(), source.getClass())); - } - } - - private static SdkFunctionSpec toProto(BoundedSource<?> source) { - return SdkFunctionSpec.newBuilder() - .setSpec( - FunctionSpec.newBuilder() - .setUrn(JAVA_SERIALIZED_BOUNDED_SOURCE) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom(SerializableUtils.serializeToByteArray(source))) - .build()))) - .build(); - } - - public static BoundedSource<?> boundedSourceFromProto(ReadPayload payload) - throws InvalidProtocolBufferException { - checkArgument(payload.getIsBounded().equals(IsBounded.BOUNDED)); - return (BoundedSource<?>) SerializableUtils.deserializeFromByteArray( - payload - .getSource() - .getSpec() - .getParameter() - .unpack(BytesValue.class) - .getValue() - .toByteArray(), - "BoundedSource"); - } - - private static SdkFunctionSpec toProto(UnboundedSource<?, ?> source) { - return SdkFunctionSpec.newBuilder() - .setSpec( - FunctionSpec.newBuilder() - .setUrn(JAVA_SERIALIZED_UNBOUNDED_SOURCE) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom(SerializableUtils.serializeToByteArray(source))) - .build()))) - .build(); - } - - public static UnboundedSource<?, ?> unboundedSourceFromProto(ReadPayload payload) - throws InvalidProtocolBufferException { - checkArgument(payload.getIsBounded().equals(IsBounded.UNBOUNDED)); - return (UnboundedSource<?, ?>) SerializableUtils.deserializeFromByteArray( - payload - .getSource() - .getSpec() - .getParameter() - .unpack(BytesValue.class) - .getValue() - .toByteArray(), - "BoundedSource"); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/b35e91d4/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java new file mode 100644 index 0000000..740b324 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assume.assumeThat; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; +import org.apache.beam.sdk.options.PipelineOptions; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** + * Tests for {@link ReadTranslation}. + */ +@RunWith(Parameterized.class) +public class ReadTranslationTest { + + @Parameters(name = "{index}: {0}") + public static Iterable<Source<?>> data() { + return ImmutableList.<Source<?>>of( + CountingSource.unbounded(), + CountingSource.upTo(100L), + new TestBoundedSource(), + new TestUnboundedSource()); + } + + @Parameter(0) + public Source<?> source; + + @Test + public void testToFromProtoBounded() throws Exception { + // TODO: Split into two tests. + assumeThat(source, instanceOf(BoundedSource.class)); + BoundedSource<?> boundedSource = (BoundedSource<?>) this.source; + Read.Bounded<?> boundedRead = Read.from(boundedSource); + ReadPayload payload = ReadTranslation.toProto(boundedRead); + assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.BOUNDED)); + BoundedSource<?> deserializedSource = ReadTranslation.boundedSourceFromProto(payload); + assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source)); + } + + @Test + public void testToFromProtoUnbounded() throws Exception { + assumeThat(source, instanceOf(UnboundedSource.class)); + UnboundedSource<?, ?> unboundedSource = (UnboundedSource<?, ?>) this.source; + Read.Unbounded<?> unboundedRead = Read.from(unboundedSource); + ReadPayload payload = ReadTranslation.toProto(unboundedRead); + assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.UNBOUNDED)); + UnboundedSource<?, ?> deserializedSource = ReadTranslation.unboundedSourceFromProto(payload); + assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source)); + } + + private static class TestBoundedSource extends BoundedSource<String> { + @Override + public List<? extends BoundedSource<String>> split( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public BoundedReader<String> createReader(PipelineOptions options) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void validate() {} + + @Override + public Coder<String> getDefaultOutputCoder() { + return StringUtf8Coder.of(); + } + + @Override + public boolean equals(Object other) { + return other != null && other.getClass().equals(TestBoundedSource.class); + } + + @Override + public int hashCode() { + return TestBoundedSource.class.hashCode(); + } + } + + private static class TestUnboundedSource extends UnboundedSource<byte[], CheckpointMark> { + @Override + public void validate() {} + + @Override + public Coder<byte[]> getDefaultOutputCoder() { + return ByteArrayCoder.of(); + } + + @Override + public List<? extends UnboundedSource<byte[], CheckpointMark>> split( + int desiredNumSplits, PipelineOptions options) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public UnboundedReader<byte[]> createReader( + PipelineOptions options, @Nullable CheckpointMark checkpointMark) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Coder<CheckpointMark> getCheckpointMarkCoder() { + return new TestCheckpointMarkCoder(); + } + + @Override + public boolean equals(Object other) { + return other != null && other.getClass().equals(TestUnboundedSource.class); + } + + @Override + public int hashCode() { + return TestUnboundedSource.class.hashCode(); + } + + private class TestCheckpointMarkCoder extends AtomicCoder<CheckpointMark> { + @Override + public void encode(CheckpointMark value, OutputStream outStream) + throws CoderException, IOException { + throw new UnsupportedOperationException(); + } + + @Override + public CheckpointMark decode(InputStream inStream) throws CoderException, IOException { + throw new UnsupportedOperationException(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b35e91d4/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java deleted file mode 100644 index a603e34..0000000 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslatorTest.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core.construction; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.assertThat; -import static org.junit.Assume.assumeThat; - -import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.CountingSource; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.Source; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; -import org.apache.beam.sdk.options.PipelineOptions; -import org.hamcrest.Matchers; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; - -/** - * Tests for {@link ReadTranslator}. - */ -@RunWith(Parameterized.class) -public class ReadTranslatorTest { - - @Parameters(name = "{index}: {0}") - public static Iterable<Source<?>> data() { - return ImmutableList.<Source<?>>of( - CountingSource.unbounded(), - CountingSource.upTo(100L), - new TestBoundedSource(), - new TestUnboundedSource()); - } - - @Parameter(0) - public Source<?> source; - - @Test - public void testToFromProtoBounded() throws Exception { - // TODO: Split into two tests. - assumeThat(source, instanceOf(BoundedSource.class)); - BoundedSource<?> boundedSource = (BoundedSource<?>) this.source; - Read.Bounded<?> boundedRead = Read.from(boundedSource); - ReadPayload payload = ReadTranslator.toProto(boundedRead); - assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.BOUNDED)); - BoundedSource<?> deserializedSource = ReadTranslator.boundedSourceFromProto(payload); - assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source)); - } - - @Test - public void testToFromProtoUnbounded() throws Exception { - assumeThat(source, instanceOf(UnboundedSource.class)); - UnboundedSource<?, ?> unboundedSource = (UnboundedSource<?, ?>) this.source; - Read.Unbounded<?> unboundedRead = Read.from(unboundedSource); - ReadPayload payload = ReadTranslator.toProto(unboundedRead); - assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.UNBOUNDED)); - UnboundedSource<?, ?> deserializedSource = ReadTranslator.unboundedSourceFromProto(payload); - assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source)); - } - - private static class TestBoundedSource extends BoundedSource<String> { - @Override - public List<? extends BoundedSource<String>> split( - long desiredBundleSizeBytes, PipelineOptions options) throws Exception { - throw new UnsupportedOperationException(); - } - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - throw new UnsupportedOperationException(); - } - - @Override - public BoundedReader<String> createReader(PipelineOptions options) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void validate() {} - - @Override - public Coder<String> getDefaultOutputCoder() { - return StringUtf8Coder.of(); - } - - @Override - public boolean equals(Object other) { - return other != null && other.getClass().equals(TestBoundedSource.class); - } - - @Override - public int hashCode() { - return TestBoundedSource.class.hashCode(); - } - } - - private static class TestUnboundedSource extends UnboundedSource<byte[], CheckpointMark> { - @Override - public void validate() {} - - @Override - public Coder<byte[]> getDefaultOutputCoder() { - return ByteArrayCoder.of(); - } - - @Override - public List<? extends UnboundedSource<byte[], CheckpointMark>> split( - int desiredNumSplits, PipelineOptions options) throws Exception { - throw new UnsupportedOperationException(); - } - - @Override - public UnboundedReader<byte[]> createReader( - PipelineOptions options, @Nullable CheckpointMark checkpointMark) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public Coder<CheckpointMark> getCheckpointMarkCoder() { - return new TestCheckpointMarkCoder(); - } - - @Override - public boolean equals(Object other) { - return other != null && other.getClass().equals(TestUnboundedSource.class); - } - - @Override - public int hashCode() { - return TestUnboundedSource.class.hashCode(); - } - - private class TestCheckpointMarkCoder extends AtomicCoder<CheckpointMark> { - @Override - public void encode(CheckpointMark value, OutputStream outStream) - throws CoderException, IOException { - throw new UnsupportedOperationException(); - } - - @Override - public CheckpointMark decode(InputStream inStream) throws CoderException, IOException { - throw new UnsupportedOperationException(); - } - } - } -}