Remove uses of context in coder size estimation calls. find . -type f -name '*.java' | xargs sed -i '' 's/\([a-zA-Z]*[bB]yteSize[a-zA-Z]*[(].*\), [^,]*[Cc]ontext[^,()]*\([(][)]\)*[)]/\1)/'
plus a one-off Python script and some manual fixups. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/96de8d73 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/96de8d73 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/96de8d73 Branch: refs/heads/master Commit: 96de8d735c40ee5f823ff17966992c433d68f296 Parents: 78a99be Author: Robert Bradshaw <rober...@gmail.com> Authored: Fri May 5 15:23:03 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Sat May 6 20:33:20 2017 -0700 ---------------------------------------------------------------------- runners/google-cloud-dataflow-java/pom.xml | 2 +- .../runners/dataflow/internal/IsmFormat.java | 8 ++-- .../runners/dataflow/util/RandomAccessData.java | 11 ++---- .../dataflow/util/RandomAccessDataTest.java | 6 +-- .../apache/beam/sdk/coders/BigDecimalCoder.java | 8 ++-- .../beam/sdk/coders/BigEndianIntegerCoder.java | 4 +- .../beam/sdk/coders/BigEndianLongCoder.java | 4 +- .../apache/beam/sdk/coders/BigIntegerCoder.java | 6 +-- .../apache/beam/sdk/coders/ByteArrayCoder.java | 10 ++--- .../org/apache/beam/sdk/coders/ByteCoder.java | 4 +- .../org/apache/beam/sdk/coders/DoubleCoder.java | 4 +- .../apache/beam/sdk/coders/DurationCoder.java | 8 ++-- .../apache/beam/sdk/coders/InstantCoder.java | 8 ++-- .../beam/sdk/coders/IterableLikeCoder.java | 9 ++--- .../org/apache/beam/sdk/coders/KvCoder.java | 12 +++--- .../beam/sdk/coders/LengthPrefixCoder.java | 11 +++--- .../org/apache/beam/sdk/coders/MapCoder.java | 10 ++--- .../apache/beam/sdk/coders/NullableCoder.java | 14 +++---- .../beam/sdk/coders/StringDelegateCoder.java | 1 - .../apache/beam/sdk/coders/StringUtf8Coder.java | 16 ++------ .../beam/sdk/coders/TextualIntegerCoder.java | 4 +- .../org/apache/beam/sdk/coders/VarIntCoder.java | 4 +- .../apache/beam/sdk/coders/VarLongCoder.java | 4 +- .../org/apache/beam/sdk/coders/VoidCoder.java | 4 +- .../beam/sdk/testing/CoderProperties.java | 6 ++- .../sdk/transforms/ApproximateQuantiles.java | 22 ++++------- .../org/apache/beam/sdk/transforms/Count.java | 4 +- .../org/apache/beam/sdk/transforms/Top.java | 9 ++--- .../beam/sdk/transforms/join/UnionCoder.java | 8 ++-- .../org/apache/beam/sdk/util/BitSetCoder.java | 1 - .../org/apache/beam/sdk/util/WindowedValue.java | 15 ++++---- .../beam/sdk/coders/BigDecimalCoderTest.java | 5 ++- .../beam/sdk/coders/BigIntegerCoderTest.java | 5 ++- .../beam/sdk/coders/CoderRegistryTest.java | 4 +- .../beam/sdk/coders/LengthPrefixCoderTest.java | 24 ++++++------ .../beam/sdk/coders/NullableCoderTest.java | 18 ++++----- .../apache/beam/sdk/testing/PAssertTest.java | 4 +- .../apache/beam/sdk/transforms/CombineTest.java | 8 ++-- .../apache/beam/sdk/transforms/ParDoTest.java | 4 +- .../extensions/protobuf/ByteStringCoder.java | 8 +--- .../protobuf/ByteStringCoderTest.java | 10 ++--- .../sdk/io/gcp/bigquery/TableRowJsonCoder.java | 4 +- .../beam/sdk/io/kafka/KafkaRecordCoder.java | 4 +- .../org/apache/beam/sdk/io/xml/JAXBCoder.java | 40 +++++++++++--------- 44 files changed, 168 insertions(+), 207 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 09901d5..b579041 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -33,7 +33,7 @@ <packaging>jar</packaging> <properties> - <dataflow.container_version>beam-master-20170505-wd-2914</dataflow.container_version> + <dataflow.container_version>beam-master-20170506</dataflow.container_version> <dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version> <dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version> </properties> http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java index aed514a..00e0c54 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java @@ -704,12 +704,12 @@ public class IsmFormat { } @Override - public boolean isRegisterByteSizeObserverCheap(KeyPrefix value, Coder.Context context) { + public boolean isRegisterByteSizeObserverCheap(KeyPrefix value) { return true; } @Override - public long getEncodedElementByteSize(KeyPrefix value, Coder.Context context) + public long getEncodedElementByteSize(KeyPrefix value) throws Exception { checkNotNull(value); return VarInt.getLength(value.getSharedKeySize()) @@ -786,12 +786,12 @@ public class IsmFormat { } @Override - public boolean isRegisterByteSizeObserverCheap(Footer value, Coder.Context context) { + public boolean isRegisterByteSizeObserverCheap(Footer value) { return true; } @Override - public long getEncodedElementByteSize(Footer value, Coder.Context context) + public long getEncodedElementByteSize(Footer value) throws Exception { return Footer.FIXED_LENGTH; } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java index 4e94515..f36bd78 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java @@ -96,22 +96,17 @@ public class RandomAccessData { } @Override - public boolean isRegisterByteSizeObserverCheap( - RandomAccessData value, Coder.Context context) { + public boolean isRegisterByteSizeObserverCheap(RandomAccessData value) { return true; } @Override - protected long getEncodedElementByteSize(RandomAccessData value, Coder.Context context) + protected long getEncodedElementByteSize(RandomAccessData value) throws Exception { if (value == null) { throw new CoderException("cannot encode a null in memory stream"); } - long size = 0; - if (!context.isWholeStream) { - size += VarInt.getLength(value.size); - } - return size + value.size; + return VarInt.getLength(value.size) + value.size; } } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java index 042e145..5a7908c 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java @@ -61,10 +61,8 @@ public class RandomAccessDataTest { CoderProperties.coderSerializable(RandomAccessDataCoder.of()); CoderProperties.structuralValueConsistentWithEquals( RandomAccessDataCoder.of(), streamA, streamB); - assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA, Context.NESTED)); - assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA, Context.OUTER)); - assertEquals(4, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.NESTED)); - assertEquals(3, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.OUTER)); + assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA)); + assertEquals(4, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA)); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java index aadf085..97559a9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java @@ -85,7 +85,7 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> { * @return {@code true}, because {@link #getEncodedElementByteSize} runs in constant time. */ @Override - public boolean isRegisterByteSizeObserverCheap(BigDecimal value, Context context) { + public boolean isRegisterByteSizeObserverCheap(BigDecimal value) { return true; } @@ -97,9 +97,9 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> { * representation of the {@link BigInteger} that, when scaled, equals the given value. */ @Override - protected long getEncodedElementByteSize(BigDecimal value, Context context) throws Exception { + protected long getEncodedElementByteSize(BigDecimal value) throws Exception { checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName())); - return VAR_INT_CODER.getEncodedElementByteSize(value.scale(), context.nested()) - + BIG_INT_CODER.getEncodedElementByteSize(value.unscaledValue(), context); + return VAR_INT_CODER.getEncodedElementByteSize(value.scale()) + + BIG_INT_CODER.getEncodedElementByteSize(value.unscaledValue()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java index c3c7a96..a61f099 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java @@ -82,7 +82,7 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> { * @return {@code true}, because {@link #getEncodedElementByteSize} runs in constant time. */ @Override - public boolean isRegisterByteSizeObserverCheap(Integer value, Context context) { + public boolean isRegisterByteSizeObserverCheap(Integer value) { return true; } @@ -97,7 +97,7 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> { * @return {@code 4}, the size in bytes of an integer's big endian encoding. */ @Override - protected long getEncodedElementByteSize(Integer value, Context context) + protected long getEncodedElementByteSize(Integer value) throws Exception { if (value == null) { throw new CoderException("cannot encode a null Integer"); http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java index 5ef4878..868160a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java @@ -82,7 +82,7 @@ public class BigEndianLongCoder extends AtomicCoder<Long> { * @return {@code true}, since {@link #getEncodedElementByteSize} returns a constant. */ @Override - public boolean isRegisterByteSizeObserverCheap(Long value, Context context) { + public boolean isRegisterByteSizeObserverCheap(Long value) { return true; } @@ -97,7 +97,7 @@ public class BigEndianLongCoder extends AtomicCoder<Long> { * @return {@code 8}, the byte size of a big-endian encoded {@code Long}. */ @Override - protected long getEncodedElementByteSize(Long value, Context context) + protected long getEncodedElementByteSize(Long value) throws Exception { if (value == null) { throw new CoderException("cannot encode a null Long"); http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java index 6d14d17..3b038af 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java @@ -75,7 +75,7 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> { * @return {@code true}, because {@link #getEncodedElementByteSize} runs in constant time. */ @Override - public boolean isRegisterByteSizeObserverCheap(BigInteger value, Context context) { + public boolean isRegisterByteSizeObserverCheap(BigInteger value) { return true; } @@ -85,8 +85,8 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> { * @return the size of the encoding as a byte array according to {@link ByteArrayCoder} */ @Override - protected long getEncodedElementByteSize(BigInteger value, Context context) throws Exception { + protected long getEncodedElementByteSize(BigInteger value) throws Exception { checkNotNull(value, String.format("cannot encode a null %s", BigInteger.class.getSimpleName())); - return BYTE_ARRAY_CODER.getEncodedElementByteSize(value.toByteArray(), context); + return BYTE_ARRAY_CODER.getEncodedElementByteSize(value.toByteArray()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java index d83d832..c9393a1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java @@ -126,7 +126,7 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> { * constant time using the {@code length} of the provided array. */ @Override - public boolean isRegisterByteSizeObserverCheap(byte[] value, Context context) { + public boolean isRegisterByteSizeObserverCheap(byte[] value) { return true; } @@ -136,15 +136,11 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> { } @Override - protected long getEncodedElementByteSize(byte[] value, Context context) + protected long getEncodedElementByteSize(byte[] value) throws Exception { if (value == null) { throw new CoderException("cannot encode a null byte[]"); } - long size = 0; - if (!context.isWholeStream) { - size += VarInt.getLength(value.length); - } - return size + value.length; + return VarInt.getLength(value.length) + value.length; } } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java index 6e4318e..7f449d6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java @@ -91,7 +91,7 @@ public class ByteCoder extends AtomicCoder<Byte> { * @return {@code true}. {@link ByteCoder#getEncodedElementByteSize} returns a constant. */ @Override - public boolean isRegisterByteSizeObserverCheap(Byte value, Context context) { + public boolean isRegisterByteSizeObserverCheap(Byte value) { return true; } @@ -106,7 +106,7 @@ public class ByteCoder extends AtomicCoder<Byte> { * @return {@code 1}, the byte size of a {@link Byte} encoded using Java serialization. */ @Override - protected long getEncodedElementByteSize(Byte value, Context context) + protected long getEncodedElementByteSize(Byte value) throws Exception { if (value == null) { throw new CoderException("cannot estimate size for unsupported null value"); http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java index 12bc5e8..8eff6ba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java @@ -93,7 +93,7 @@ public class DoubleCoder extends AtomicCoder<Double> { * @return {@code true}. {@link DoubleCoder#getEncodedElementByteSize} returns a constant. */ @Override - public boolean isRegisterByteSizeObserverCheap(Double value, Context context) { + public boolean isRegisterByteSizeObserverCheap(Double value) { return true; } @@ -108,7 +108,7 @@ public class DoubleCoder extends AtomicCoder<Double> { * @return {@code 8}, the byte size of a {@link Double} encoded using Java serialization. */ @Override - protected long getEncodedElementByteSize(Double value, Context context) + protected long getEncodedElementByteSize(Double value) throws Exception { if (value == null) { throw new CoderException("cannot encode a null Double"); http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java index 7b49d1f..8b4ae1d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java @@ -89,14 +89,14 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> { * @return {@code true}, because it is cheap to ascertain the byte size of a long. */ @Override - public boolean isRegisterByteSizeObserverCheap(ReadableDuration value, Context context) { - return LONG_CODER.isRegisterByteSizeObserverCheap(toLong(value), context); + public boolean isRegisterByteSizeObserverCheap(ReadableDuration value) { + return LONG_CODER.isRegisterByteSizeObserverCheap(toLong(value)); } @Override public void registerByteSizeObserver( - ReadableDuration value, ElementByteSizeObserver observer, Context context) throws Exception { - LONG_CODER.registerByteSizeObserver(toLong(value), observer, context); + ReadableDuration value, ElementByteSizeObserver observer) throws Exception { + LONG_CODER.registerByteSizeObserver(toLong(value), observer); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java index 56ed12b..000f406 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java @@ -103,16 +103,16 @@ public class InstantCoder extends AtomicCoder<Instant> { * @return {@code true}. The byte size for a big endian long is a constant. */ @Override - public boolean isRegisterByteSizeObserverCheap(Instant value, Context context) { + public boolean isRegisterByteSizeObserverCheap(Instant value) { return LONG_CODER.isRegisterByteSizeObserverCheap( - ORDER_PRESERVING_CONVERTER.convert(value), context); + ORDER_PRESERVING_CONVERTER.convert(value)); } @Override public void registerByteSizeObserver( - Instant value, ElementByteSizeObserver observer, Context context) throws Exception { + Instant value, ElementByteSizeObserver observer) throws Exception { LONG_CODER.registerByteSizeObserver( - ORDER_PRESERVING_CONVERTER.convert(value), observer, context); + ORDER_PRESERVING_CONVERTER.convert(value), observer); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java index 52b9c34..9994b3f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java @@ -170,18 +170,17 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> */ @Override public boolean isRegisterByteSizeObserverCheap( - IterableT iterable, Context context) { + IterableT iterable) { return iterable instanceof ElementByteSizeObservableIterable; } @Override public void registerByteSizeObserver( - IterableT iterable, ElementByteSizeObserver observer, Context context) + IterableT iterable, ElementByteSizeObserver observer) throws Exception { if (iterable == null) { throw new CoderException("cannot encode a null Iterable"); } - Context nestedContext = context.nested(); if (iterable instanceof ElementByteSizeObservableIterable) { observer.setLazy(); @@ -196,7 +195,7 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> Collection<T> collection = (Collection<T>) iterable; observer.update(4L); for (T elem : collection) { - elementCoder.registerByteSizeObserver(elem, observer, nestedContext); + elementCoder.registerByteSizeObserver(elem, observer); } } else { // TODO: (BEAM-1537) Update to use an accurate count depending on size and count, @@ -208,7 +207,7 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> long count = 0; for (T elem : iterable) { count += 1; - elementCoder.registerByteSizeObserver(elem, observer, nestedContext); + elementCoder.registerByteSizeObserver(elem, observer); } if (count > 0) { // Update the length based upon the number of counted elements, this helps http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java index da7f03c..1df4460 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java @@ -105,9 +105,9 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> { * Returns whether both keyCoder and valueCoder are considered not expensive. */ @Override - public boolean isRegisterByteSizeObserverCheap(KV<K, V> kv, Context context) { - return keyCoder.isRegisterByteSizeObserverCheap(kv.getKey(), context.nested()) - && valueCoder.isRegisterByteSizeObserverCheap(kv.getValue(), context); + public boolean isRegisterByteSizeObserverCheap(KV<K, V> kv) { + return keyCoder.isRegisterByteSizeObserverCheap(kv.getKey()) + && valueCoder.isRegisterByteSizeObserverCheap(kv.getValue()); } /** @@ -116,13 +116,13 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> { */ @Override public void registerByteSizeObserver( - KV<K, V> kv, ElementByteSizeObserver observer, Context context) + KV<K, V> kv, ElementByteSizeObserver observer) throws Exception { if (kv == null) { throw new CoderException("cannot encode a null KV"); } - keyCoder.registerByteSizeObserver(kv.getKey(), observer, context.nested()); - valueCoder.registerByteSizeObserver(kv.getValue(), observer, context); + keyCoder.registerByteSizeObserver(kv.getKey(), observer); + valueCoder.registerByteSizeObserver(kv.getValue(), observer); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java index 685e766..7dd2a32 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java @@ -107,18 +107,17 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> { * {@inheritDoc} */ @Override - protected long getEncodedElementByteSize(T value, Context context) throws Exception { + protected long getEncodedElementByteSize(T value) throws Exception { if (valueCoder instanceof StructuredCoder) { // If valueCoder is a StructuredCoder then we can ask it directly for the encoded size of // the value, adding the number of bytes to represent the length. - long valueSize = ((StructuredCoder<T>) valueCoder).getEncodedElementByteSize( - value, Context.OUTER); + long valueSize = ((StructuredCoder<T>) valueCoder).getEncodedElementByteSize(value); return VarInt.getLength(valueSize) + valueSize; } // If value is not a StructuredCoder then fall back to the default StructuredCoder behavior // of encoding and counting the bytes. The encoding will include the length prefix. - return super.getEncodedElementByteSize(value, context); + return super.getEncodedElementByteSize(value); } /** @@ -127,7 +126,7 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> { * {@inheritDoc} */ @Override - public boolean isRegisterByteSizeObserverCheap(@Nullable T value, Context context) { - return valueCoder.isRegisterByteSizeObserverCheap(value, Context.OUTER); + public boolean isRegisterByteSizeObserverCheap(@Nullable T value) { + return valueCoder.isRegisterByteSizeObserverCheap(value); } } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java index 9e3c768..7df9ca9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java @@ -146,7 +146,7 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> { @Override public void registerByteSizeObserver( - Map<K, V> map, ElementByteSizeObserver observer, Context context) + Map<K, V> map, ElementByteSizeObserver observer) throws Exception { observer.update(4L); if (map.isEmpty()) { @@ -155,12 +155,12 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> { Iterator<Entry<K, V>> entries = map.entrySet().iterator(); Entry<K, V> entry = entries.next(); while (entries.hasNext()) { - keyCoder.registerByteSizeObserver(entry.getKey(), observer, context.nested()); - valueCoder.registerByteSizeObserver(entry.getValue(), observer, context.nested()); + keyCoder.registerByteSizeObserver(entry.getKey(), observer); + valueCoder.registerByteSizeObserver(entry.getValue(), observer); entry = entries.next(); } - keyCoder.registerByteSizeObserver(entry.getKey(), observer, context.nested()); - valueCoder.registerByteSizeObserver(entry.getValue(), observer, context); + keyCoder.registerByteSizeObserver(entry.getKey(), observer); + valueCoder.registerByteSizeObserver(entry.getValue(), observer); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java index d1eea9a..e46591e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java @@ -127,10 +127,10 @@ public class NullableCoder<T> extends StructuredCoder<T> { */ @Override public void registerByteSizeObserver( - @Nullable T value, ElementByteSizeObserver observer, Context context) throws Exception { + @Nullable T value, ElementByteSizeObserver observer) throws Exception { observer.update(1); if (value != null) { - valueCoder.registerByteSizeObserver(value, observer, context); + valueCoder.registerByteSizeObserver(value, observer); } } @@ -142,7 +142,7 @@ public class NullableCoder<T> extends StructuredCoder<T> { * {@inheritDoc} */ @Override - protected long getEncodedElementByteSize(@Nullable T value, Context context) throws Exception { + protected long getEncodedElementByteSize(@Nullable T value) throws Exception { if (value == null) { return 1; } @@ -151,12 +151,12 @@ public class NullableCoder<T> extends StructuredCoder<T> { // If valueCoder is a StructuredCoder then we can ask it directly for the encoded size of // the value, adding 1 byte to count the null indicator. return 1 + ((StructuredCoder<T>) valueCoder) - .getEncodedElementByteSize(value, context); + .getEncodedElementByteSize(value); } // If value is not a StructuredCoder then fall back to the default StructuredCoder behavior // of encoding and counting the bytes. The encoding will include the null indicator byte. - return super.getEncodedElementByteSize(value, context); + return super.getEncodedElementByteSize(value); } /** @@ -165,11 +165,11 @@ public class NullableCoder<T> extends StructuredCoder<T> { * {@inheritDoc} */ @Override - public boolean isRegisterByteSizeObserverCheap(@Nullable T value, Context context) { + public boolean isRegisterByteSizeObserverCheap(@Nullable T value) { if (value == null) { return true; } - return valueCoder.isRegisterByteSizeObserverCheap(value, context); + return valueCoder.isRegisterByteSizeObserverCheap(value); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java index 39a1658..1f4538f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java @@ -125,4 +125,3 @@ public final class StringDelegateCoder<T> extends CustomCoder<T> { return delegateCoder.getEncodedTypeDescriptor(); } } - http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java index 42931ca..44856e8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java @@ -18,8 +18,6 @@ package org.apache.beam.sdk.coders; import com.google.common.base.Utf8; -import com.google.common.io.ByteStreams; -import com.google.common.io.CountingOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; @@ -128,20 +126,12 @@ public class StringUtf8Coder extends AtomicCoder<String> { * the byte size of the encoding plus the encoded length prefix. */ @Override - public long getEncodedElementByteSize(String value, Context context) + public long getEncodedElementByteSize(String value) throws Exception { if (value == null) { throw new CoderException("cannot encode a null String"); } - if (context.isWholeStream) { - return Utf8.encodedLength(value); - } else { - try (CountingOutputStream countingStream = - new CountingOutputStream(ByteStreams.nullOutputStream())) { - DataOutputStream stream = new DataOutputStream(countingStream); - writeString(value, stream); - return countingStream.getCount(); - } - } + int size = Utf8.encodedLength(value); + return VarInt.getLength(size) + size; } } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java index 9743c4c..718811c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java @@ -70,11 +70,11 @@ public class TextualIntegerCoder extends AtomicCoder<Integer> { } @Override - protected long getEncodedElementByteSize(Integer value, Context context) throws Exception { + protected long getEncodedElementByteSize(Integer value) throws Exception { if (value == null) { throw new CoderException("cannot encode a null Integer"); } String textualValue = value.toString(); - return StringUtf8Coder.of().getEncodedElementByteSize(textualValue, context); + return StringUtf8Coder.of().getEncodedElementByteSize(textualValue); } } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java index 30f9c09..bda66bb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java @@ -83,7 +83,7 @@ public class VarIntCoder extends AtomicCoder<Integer> { * @return {@code true}. {@link #getEncodedElementByteSize} is cheap. */ @Override - public boolean isRegisterByteSizeObserverCheap(Integer value, Context context) { + public boolean isRegisterByteSizeObserverCheap(Integer value) { return true; } @@ -93,7 +93,7 @@ public class VarIntCoder extends AtomicCoder<Integer> { } @Override - protected long getEncodedElementByteSize(Integer value, Context context) + protected long getEncodedElementByteSize(Integer value) throws Exception { if (value == null) { throw new CoderException("cannot encode a null Integer"); http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java index 9a7b125..bf651c3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java @@ -89,7 +89,7 @@ public class VarLongCoder extends StructuredCoder<Long> { * @return {@code true}. {@link #getEncodedElementByteSize} is cheap. */ @Override - public boolean isRegisterByteSizeObserverCheap(Long value, Context context) { + public boolean isRegisterByteSizeObserverCheap(Long value) { return true; } @@ -99,7 +99,7 @@ public class VarLongCoder extends StructuredCoder<Long> { } @Override - protected long getEncodedElementByteSize(Long value, Context context) + protected long getEncodedElementByteSize(Long value) throws Exception { if (value == null) { throw new CoderException("cannot encode a null Long"); http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java index 829bd20..4467faa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java @@ -67,7 +67,7 @@ public class VoidCoder extends AtomicCoder<Void> { * @return {@code true}. {@link VoidCoder#getEncodedElementByteSize} runs in constant time. */ @Override - public boolean isRegisterByteSizeObserverCheap(Void value, Context context) { + public boolean isRegisterByteSizeObserverCheap(Void value) { return true; } @@ -77,7 +77,7 @@ public class VoidCoder extends AtomicCoder<Void> { } @Override - protected long getEncodedElementByteSize(Void value, Context context) + protected long getEncodedElementByteSize(Void value) throws Exception { return 0; } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java index 6e0e264..f660f7d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java @@ -397,13 +397,15 @@ public class CoderProperties { try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream())) { for (T elem : elements) { - coder.registerByteSizeObserver(elem, observer, context); + coder.registerByteSizeObserver(elem, observer); coder.encode(elem, os, context); observer.advance(); } long expectedLength = os.getCount(); - assertEquals(expectedLength, observer.getSum()); + if (!context.isWholeStream) { + assertEquals(expectedLength, observer.getSum()); + } assertEquals(elements.length, observer.getCount()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java index b05f223..37d5a55 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java @@ -741,24 +741,16 @@ public class ApproximateQuantiles { @Override public void registerByteSizeObserver( QuantileState<T, ComparatorT> state, - ElementByteSizeObserver observer, - Coder.Context context) + ElementByteSizeObserver observer) throws Exception { - Coder.Context nestedContext = context.nested(); - elementCoder.registerByteSizeObserver( - state.min, observer, nestedContext); - elementCoder.registerByteSizeObserver( - state.max, observer, nestedContext); - elementListCoder.registerByteSizeObserver( - state.unbufferedElements, observer, nestedContext); - - BigEndianIntegerCoder.of().registerByteSizeObserver( - state.buffers.size(), observer, nestedContext); + elementCoder.registerByteSizeObserver(state.min, observer); + elementCoder.registerByteSizeObserver(state.max, observer); + elementListCoder.registerByteSizeObserver(state.unbufferedElements, observer); + + BigEndianIntegerCoder.of().registerByteSizeObserver(state.buffers.size(), observer); for (QuantileBuffer<T> buffer : state.buffers) { observer.update(4L + 8); - - elementListCoder.registerByteSizeObserver( - buffer.elements, observer, nestedContext); + elementListCoder.registerByteSizeObserver(buffer.elements, observer); } } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java index 753e14c..497d62b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java @@ -185,12 +185,12 @@ public class Count { } @Override - public boolean isRegisterByteSizeObserverCheap(long[] value, Context context) { + public boolean isRegisterByteSizeObserverCheap(long[] value) { return true; } @Override - protected long getEncodedElementByteSize(long[] value, Context context) { + protected long getEncodedElementByteSize(long[] value) { return VarInt.getLength(value[0]); } }; http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java index 9d5db74..c0381a7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java @@ -557,16 +557,15 @@ public class Top { @Override public boolean isRegisterByteSizeObserverCheap( - BoundedHeap<T, ComparatorT> value, Context context) { - return listCoder.isRegisterByteSizeObserverCheap( - value.asList(), context); + BoundedHeap<T, ComparatorT> value) { + return listCoder.isRegisterByteSizeObserverCheap(value.asList()); } @Override public void registerByteSizeObserver( - BoundedHeap<T, ComparatorT> value, ElementByteSizeObserver observer, Context context) + BoundedHeap<T, ComparatorT> value, ElementByteSizeObserver observer) throws Exception { - listCoder.registerByteSizeObserver(value.asList(), observer, context); + listCoder.registerByteSizeObserver(value.asList(), observer); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java index 4a2a286..3194a37 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java @@ -100,11 +100,11 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> { * time, we defer the return value to that coder. */ @Override - public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, Context context) { + public boolean isRegisterByteSizeObserverCheap(RawUnionValue union) { int index = getIndexForEncoding(union); @SuppressWarnings("unchecked") Coder<Object> coder = (Coder<Object>) elementCoders.get(index); - return coder.isRegisterByteSizeObserverCheap(union.getValue(), context); + return coder.isRegisterByteSizeObserverCheap(union.getValue()); } /** @@ -112,7 +112,7 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> { */ @Override public void registerByteSizeObserver( - RawUnionValue union, ElementByteSizeObserver observer, Context context) + RawUnionValue union, ElementByteSizeObserver observer) throws Exception { int index = getIndexForEncoding(union); // Write out the union tag. @@ -120,7 +120,7 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> { // Write out the actual value. @SuppressWarnings("unchecked") Coder<Object> coder = (Coder<Object>) elementCoders.get(index); - coder.registerByteSizeObserver(union.getValue(), observer, context); + coder.registerByteSizeObserver(union.getValue(), observer); } ///////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java index b646bf6..a0896f5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java @@ -59,4 +59,3 @@ public class BitSetCoder extends AtomicCoder<BitSet> { BYTE_ARRAY_CODER.verifyDeterministic(); } } - http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index 1e72550..1b7e335 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -670,12 +670,11 @@ public abstract class WindowedValue<T> { @Override public void registerByteSizeObserver(WindowedValue<T> value, - ElementByteSizeObserver observer, - Context context) throws Exception { - InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer, context.nested()); - windowsCoder.registerByteSizeObserver(value.getWindows(), observer, context.nested()); - PaneInfoCoder.INSTANCE.registerByteSizeObserver(value.getPane(), observer, context.nested()); - valueCoder.registerByteSizeObserver(value.getValue(), observer, context); + ElementByteSizeObserver observer) throws Exception { + InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer); + windowsCoder.registerByteSizeObserver(value.getWindows(), observer); + PaneInfoCoder.INSTANCE.registerByteSizeObserver(value.getPane(), observer); + valueCoder.registerByteSizeObserver(value.getValue(), observer); } @Override @@ -733,9 +732,9 @@ public abstract class WindowedValue<T> { @Override public void registerByteSizeObserver( - WindowedValue<T> value, ElementByteSizeObserver observer, Context context) + WindowedValue<T> value, ElementByteSizeObserver observer) throws Exception { - valueCoder.registerByteSizeObserver(value.getValue(), observer, context); + valueCoder.registerByteSizeObserver(value.getValue(), observer); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java index 8aa2604..a8a1bc8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java @@ -100,10 +100,11 @@ public class BigDecimalCoderTest { public void testGetEncodedElementByteSize() throws Exception { TestElementByteSizeObserver observer = new TestElementByteSizeObserver(); for (BigDecimal value : TEST_VALUES) { - TEST_CODER.registerByteSizeObserver(value, observer, Coder.Context.OUTER); + TEST_CODER.registerByteSizeObserver(value, observer); observer.advance(); assertThat(observer.getSumAndReset(), - equalTo((long) CoderUtils.encodeToByteArray(TEST_CODER, value).length)); + equalTo((long) CoderUtils.encodeToByteArray( + TEST_CODER, value, Coder.Context.NESTED).length)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigIntegerCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigIntegerCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigIntegerCoderTest.java index e2fd012..bfd6b4a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigIntegerCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigIntegerCoderTest.java @@ -75,11 +75,12 @@ public class BigIntegerCoderTest { public void testGetEncodedElementByteSize() throws Exception { TestElementByteSizeObserver observer = new TestElementByteSizeObserver(); for (BigInteger value : TEST_VALUES) { - TEST_CODER.registerByteSizeObserver(value, observer, Coder.Context.OUTER); + TEST_CODER.registerByteSizeObserver(value, observer); observer.advance(); assertThat( observer.getSumAndReset(), - equalTo((long) CoderUtils.encodeToByteArray(TEST_CODER, value).length)); + equalTo((long) CoderUtils.encodeToByteArray( + TEST_CODER, value, Coder.Context.NESTED).length)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java index 9568324..7ca7fb9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java @@ -399,13 +399,13 @@ public class CoderRegistryTest { } @Override - public boolean isRegisterByteSizeObserverCheap(MyValue value, Context context) { + public boolean isRegisterByteSizeObserverCheap(MyValue value) { return true; } @Override public void registerByteSizeObserver( - MyValue value, ElementByteSizeObserver observer, Context context) + MyValue value, ElementByteSizeObserver observer) throws Exception { observer.update(0L); } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java index fa81a7c..9a09b86 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java @@ -64,23 +64,22 @@ public class LengthPrefixCoderTest { @Test public void testEncodedSize() throws Exception { - assertEquals(4L, - TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0), Coder.Context.NESTED)); - assertEquals(4L, - TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0), Coder.Context.OUTER)); + assertEquals(5L, + TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0))); } @Test public void testObserverIsCheap() throws Exception { - NullableCoder<Double> coder = NullableCoder.of(DoubleCoder.of()); - assertTrue(coder.isRegisterByteSizeObserverCheap(5.0, Coder.Context.OUTER)); + LengthPrefixCoder<Double> coder = LengthPrefixCoder.of(DoubleCoder.of()); + assertTrue(coder.isRegisterByteSizeObserverCheap(5.0)); } @Test public void testObserverIsNotCheap() throws Exception { - NullableCoder<List<String>> coder = NullableCoder.of(ListCoder.of(StringUtf8Coder.of())); + LengthPrefixCoder<List<String>> coder = + LengthPrefixCoder.of(ListCoder.of(StringUtf8Coder.of())); assertFalse(coder.isRegisterByteSizeObserverCheap( - ImmutableList.of("hi", "test"), Coder.Context.OUTER)); + ImmutableList.of("hi", "test"))); } @Test @@ -92,11 +91,10 @@ public class LengthPrefixCoderTest { @Test public void testRegisterByteSizeObserver() throws Exception { - CoderProperties.testByteCount(TEST_CODER, Coder.Context.OUTER, - new byte[][]{{ 0xa, 0xb, 0xc }}); - - CoderProperties.testByteCount(TEST_CODER, Coder.Context.NESTED, - new byte[][]{{ 0xa, 0xb, 0xc }, {}, {}, { 0xd, 0xe }, {}}); + CoderProperties.testByteCount( + LengthPrefixCoder.of(VarIntCoder.of()), + Coder.Context.NESTED, + new Integer[]{0, 10, 1000}); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java index c0a4bed..d6d7de8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java @@ -98,38 +98,34 @@ public class NullableCoderTest { @Test public void testEncodedSize() throws Exception { NullableCoder<Double> coder = NullableCoder.of(DoubleCoder.of()); - assertEquals(1, coder.getEncodedElementByteSize(null, Coder.Context.OUTER)); - assertEquals(9, coder.getEncodedElementByteSize(5.0, Coder.Context.OUTER)); + assertEquals(1, coder.getEncodedElementByteSize(null)); + assertEquals(9, coder.getEncodedElementByteSize(5.0)); } @Test public void testEncodedSizeNested() throws Exception { NullableCoder<String> varLenCoder = NullableCoder.of(StringUtf8Coder.of()); - - assertEquals(1, varLenCoder.getEncodedElementByteSize(null, Context.OUTER)); - assertEquals(1, varLenCoder.getEncodedElementByteSize(null, Context.NESTED)); - - assertEquals(5, varLenCoder.getEncodedElementByteSize("spam", Context.OUTER)); - assertEquals(6, varLenCoder.getEncodedElementByteSize("spam", Context.NESTED)); + assertEquals(1, varLenCoder.getEncodedElementByteSize(null)); + assertEquals(6, varLenCoder.getEncodedElementByteSize("spam")); } @Test public void testObserverIsCheap() throws Exception { NullableCoder<Double> coder = NullableCoder.of(DoubleCoder.of()); - assertTrue(coder.isRegisterByteSizeObserverCheap(5.0, Coder.Context.OUTER)); + assertTrue(coder.isRegisterByteSizeObserverCheap(5.0)); } @Test public void testObserverIsNotCheap() throws Exception { NullableCoder<List<String>> coder = NullableCoder.of(ListCoder.of(StringUtf8Coder.of())); assertFalse(coder.isRegisterByteSizeObserverCheap( - ImmutableList.of("hi", "test"), Coder.Context.OUTER)); + ImmutableList.of("hi", "test"))); } @Test public void testObserverIsAlwaysCheapForNullValues() throws Exception { NullableCoder<List<String>> coder = NullableCoder.of(ListCoder.of(StringUtf8Coder.of())); - assertTrue(coder.isRegisterByteSizeObserverCheap(null, Coder.Context.OUTER)); + assertTrue(coder.isRegisterByteSizeObserverCheap(null)); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index 2ef892c..83f348c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -109,13 +109,13 @@ public class PAssertTest implements Serializable { } @Override - public boolean isRegisterByteSizeObserverCheap(NotSerializableObject value, Context context) { + public boolean isRegisterByteSizeObserverCheap(NotSerializableObject value) { return true; } @Override public void registerByteSizeObserver( - NotSerializableObject value, ElementByteSizeObserver observer, Context context) + NotSerializableObject value, ElementByteSizeObserver observer) throws Exception { observer.update(0L); } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index 12619e0..a70af94 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -895,16 +895,16 @@ public class CombineTest implements Serializable { @Override public boolean isRegisterByteSizeObserverCheap( - CountSum value, Context context) { + CountSum value) { return true; } @Override public void registerByteSizeObserver( - CountSum value, ElementByteSizeObserver observer, Context context) + CountSum value, ElementByteSizeObserver observer) throws Exception { - LONG_CODER.registerByteSizeObserver(value.count, observer, context.nested()); - DOUBLE_CODER.registerByteSizeObserver(value.sum, observer, context); + LONG_CODER.registerByteSizeObserver(value.count, observer); + DOUBLE_CODER.registerByteSizeObserver(value.sum, observer); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index cbbe7f1..d2cb980 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -997,13 +997,13 @@ public class ParDoTest implements Serializable { } @Override - public boolean isRegisterByteSizeObserverCheap(TestDummy value, Context context) { + public boolean isRegisterByteSizeObserverCheap(TestDummy value) { return true; } @Override public void registerByteSizeObserver( - TestDummy value, ElementByteSizeObserver observer, Context context) + TestDummy value, ElementByteSizeObserver observer) throws Exception { observer.update(0L); } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java index 0781cf1..325c69d 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java @@ -76,12 +76,8 @@ public class ByteStringCoder extends AtomicCoder<ByteString> { } @Override - protected long getEncodedElementByteSize(ByteString value, Context context) throws Exception { + protected long getEncodedElementByteSize(ByteString value) throws Exception { int size = value.size(); - - if (context.isWholeStream) { - return size; - } return VarInt.getLength(size) + size; } @@ -106,7 +102,7 @@ public class ByteStringCoder extends AtomicCoder<ByteString> { * <p>Returns true. {@link ByteString#size} returns the size of an array and a {@link VarInt}. */ @Override - public boolean isRegisterByteSizeObserverCheap(ByteString value, Context context) { + public boolean isRegisterByteSizeObserverCheap(ByteString value) { return true; } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoderTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoderTest.java index 8fdb851..d1800a6 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoderTest.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoderTest.java @@ -115,12 +115,10 @@ public class ByteStringCoderTest { } @Test - public void testEncodedElementByteSizeInAllContexts() throws Throwable { - for (Context context : CoderProperties.ALL_CONTEXTS) { - for (ByteString value : TEST_VALUES) { - byte[] encoded = CoderUtils.encodeToByteArray(TEST_CODER, value, context); - assertEquals(encoded.length, TEST_CODER.getEncodedElementByteSize(value, context)); - } + public void testEncodedElementByteSize() throws Throwable { + for (ByteString value : TEST_VALUES) { + byte[] encoded = CoderUtils.encodeToByteArray(TEST_CODER, value, Context.NESTED); + assertEquals(encoded.length, TEST_CODER.getEncodedElementByteSize(value)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java index 7ca8958..cfec991 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java @@ -52,10 +52,10 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> { } @Override - protected long getEncodedElementByteSize(TableRow value, Context context) + protected long getEncodedElementByteSize(TableRow value) throws Exception { String strValue = MAPPER.writeValueAsString(value); - return StringUtf8Coder.of().getEncodedElementByteSize(strValue, context); + return StringUtf8Coder.of().getEncodedElementByteSize(strValue); } ///////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java index 0f5dc4c..d838a0d 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java @@ -83,8 +83,8 @@ public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K, V>> { } @Override - public boolean isRegisterByteSizeObserverCheap(KafkaRecord<K, V> value, Context context) { - return kvCoder.isRegisterByteSizeObserverCheap(value.getKV(), context); + public boolean isRegisterByteSizeObserverCheap(KafkaRecord<K, V> value) { + return kvCoder.isRegisterByteSizeObserverCheap(value.getKV()); //TODO : do we have to implement getEncodedSize()? } http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java index 75a4619..5b2ec02 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.xml; import com.google.common.io.ByteStreams; +import java.io.ByteArrayOutputStream; import java.io.FilterInputStream; import java.io.FilterOutputStream; import java.io.IOException; @@ -87,37 +88,40 @@ public class JAXBCoder<T> extends CustomCoder<T> { } @Override - public void encode(T value, OutputStream outStream, Context context) - throws CoderException, IOException { + public void encode(T value, OutputStream outStream) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { - if (!context.isWholeStream) { - try { - long size = getEncodedElementByteSize(value, Context.OUTER); - // record the number of bytes the XML consists of so when reading we only read the encoded - // value - VarInt.encode(size, outStream); - } catch (Exception e) { - throw new CoderException( - "An Exception occured while trying to get the size of an encoded representation", e); - } - } - - jaxbMarshaller.get().marshal(value, new CloseIgnoringOutputStream(outStream)); + jaxbMarshaller.get().marshal(value, baos); } catch (JAXBException e) { throw new CoderException(e); } + VarInt.encode(baos.size(), outStream); + baos.writeTo(outStream); + } + + @Override + public void encode(T value, OutputStream outStream, Context context) + throws CoderException, IOException { + if (context.isWholeStream) { + try { + jaxbMarshaller.get().marshal(value, new CloseIgnoringOutputStream(outStream)); + } catch (JAXBException e) { + throw new CoderException(e); + } + } else { + encode(value, outStream); + } } @Override public T decode(InputStream inStream, Context context) throws CoderException, IOException { try { - InputStream stream = inStream; if (!context.isWholeStream) { long limit = VarInt.decodeLong(inStream); - stream = ByteStreams.limit(inStream, limit); + inStream = ByteStreams.limit(inStream, limit); } @SuppressWarnings("unchecked") - T obj = (T) jaxbUnmarshaller.get().unmarshal(new CloseIgnoringInputStream(stream)); + T obj = (T) jaxbUnmarshaller.get().unmarshal(new CloseIgnoringInputStream(inStream)); return obj; } catch (JAXBException e) { throw new CoderException(e);