Repository: beam Updated Branches: refs/heads/master 23731fe7a -> 3a09ed575
Remove explicit used of nested contexts. find . -type f -name '*.java' | xargs sed -i '' 's/\([.]..code[(].*\), *context.nested..[)]/\1)/' find . -type f -name '*.java' | xargs sed -i '' 's/\([.]..code[(].*\), *nestedContext[)]/\1)/' find . -type f -name '*.java' | xargs sed -i '' 's/\([.]..code[(].*\), *Context.NESTED[)]/\1)/' find . -type f -name '*.java' | xargs sed -i '' 's/\([.]..code[(].*\), *[^ ]*.Context.NESTED[)]/\1)/' Added back explicit context in CoGbkResult.java due to compile error. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/27e9a060 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/27e9a060 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/27e9a060 Branch: refs/heads/master Commit: 27e9a060ed593b2b53b88481591f14b1a274c61b Parents: 23731fe Author: Robert Bradshaw <rober...@gmail.com> Authored: Fri May 5 16:20:37 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Mon May 8 20:17:54 2017 -0700 ---------------------------------------------------------------------- .../UnboundedReadFromBoundedSource.java | 4 +-- .../core/ElementAndRestrictionCoder.java | 4 +-- .../beam/runners/core/KeyedWorkItemCoder.java | 8 +++--- .../beam/runners/core/TimerInternals.java | 12 ++++----- .../translation/types/CoderTypeSerializer.java | 4 +-- .../streaming/SingletonKeyedWorkItemCoder.java | 4 +-- .../state/FlinkKeyGroupStateInternals.java | 8 +++--- .../runners/dataflow/BatchViewOverrides.java | 4 +-- .../runners/dataflow/internal/IsmFormat.java | 24 ++++++++--------- .../spark/aggregators/NamedAggregators.java | 4 +-- .../apache/beam/sdk/coders/BigDecimalCoder.java | 4 +-- .../beam/sdk/coders/IterableLikeCoder.java | 8 +++--- .../org/apache/beam/sdk/coders/KvCoder.java | 4 +-- .../org/apache/beam/sdk/coders/MapCoder.java | 12 ++++----- .../org/apache/beam/sdk/io/FileBasedSink.java | 4 +-- .../sdk/transforms/ApproximateQuantiles.java | 20 +++++++------- .../apache/beam/sdk/transforms/CombineFns.java | 4 +-- .../org/apache/beam/sdk/transforms/Mean.java | 4 +-- .../beam/sdk/transforms/join/CoGbkResult.java | 2 +- .../transforms/windowing/IntervalWindow.java | 4 +-- .../org/apache/beam/sdk/util/WindowedValue.java | 10 +++---- .../beam/sdk/values/TimestampedValue.java | 4 +-- .../beam/sdk/values/ValueInSingleWindow.java | 12 ++++----- .../beam/sdk/values/ValueWithRecordId.java | 4 +-- .../beam/sdk/coders/SerializableCoderTest.java | 28 ++++++++++---------- .../apache/beam/sdk/transforms/CombineTest.java | 4 +-- .../apache/beam/sdk/transforms/CreateTest.java | 4 +-- .../transforms/windowing/GlobalWindowTest.java | 2 +- ...BufferedElementCountingOutputStreamTest.java | 5 ++-- .../BeamFnDataBufferingOutboundObserver.java | 2 +- .../harness/data/BeamFnDataInboundObserver.java | 2 +- ...BeamFnDataBufferingOutboundObserverTest.java | 2 +- .../data/BeamFnDataInboundObserverTest.java | 2 +- .../sdk/io/gcp/bigquery/ShardedKeyCoder.java | 11 ++++---- .../io/gcp/bigquery/TableDestinationCoder.java | 10 +++---- .../sdk/io/gcp/bigquery/TableRowInfoCoder.java | 4 +-- .../io/gcp/bigquery/WriteBundlesToFiles.java | 12 ++++----- .../PubsubMessageWithAttributesCoder.java | 4 +-- .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 16 +++++------ .../io/gcp/pubsub/PubsubUnboundedSource.java | 2 +- .../apache/beam/sdk/io/xml/JAXBCoderTest.java | 8 +++--- 41 files changed, 145 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java index 1424b8b..ae28e3a 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java @@ -223,7 +223,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle @Override public void encode(Checkpoint<T> value, OutputStream outStream, Context context) throws CoderException, IOException { - elemsCoder.encode(value.residualElements, outStream, context.nested()); + elemsCoder.encode(value.residualElements, outStream); sourceCoder.encode(value.residualSource, outStream, context); } @@ -232,7 +232,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle public Checkpoint<T> decode(InputStream inStream, Context context) throws CoderException, IOException { return new Checkpoint<>( - elemsCoder.decode(inStream, context.nested()), + elemsCoder.decode(inStream), sourceCoder.decode(inStream, context)); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java index 83c4e62..5ddd865 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java @@ -55,14 +55,14 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT> if (value == null) { throw new CoderException("cannot encode a null ElementAndRestriction"); } - elementCoder.encode(value.element(), outStream, context.nested()); + elementCoder.encode(value.element(), outStream); restrictionCoder.encode(value.restriction(), outStream, context); } @Override public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream, Context context) throws IOException { - ElementT key = elementCoder.decode(inStream, context.nested()); + ElementT key = elementCoder.decode(inStream); RestrictionT value = restrictionCoder.decode(inStream, context); return ElementAndRestriction.of(key, value); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java index e1872b5..ac8a34c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java @@ -72,8 +72,8 @@ public class KeyedWorkItemCoder<K, ElemT> extends StructuredCoder<KeyedWorkItem< public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { Coder.Context nestedContext = context.nested(); - keyCoder.encode(value.key(), outStream, nestedContext); - timersCoder.encode(value.timersIterable(), outStream, nestedContext); + keyCoder.encode(value.key(), outStream); + timersCoder.encode(value.timersIterable(), outStream); elemsCoder.encode(value.elementsIterable(), outStream, context); } @@ -81,8 +81,8 @@ public class KeyedWorkItemCoder<K, ElemT> extends StructuredCoder<KeyedWorkItem< public KeyedWorkItem<K, ElemT> decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { Coder.Context nestedContext = context.nested(); - K key = keyCoder.decode(inStream, nestedContext); - Iterable<TimerData> timers = timersCoder.decode(inStream, nestedContext); + K key = keyCoder.decode(inStream); + Iterable<TimerData> timers = timersCoder.decode(inStream); Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, context); return KeyedWorkItems.workItem(key, timers, elems); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java index 888c11f..3607fdd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java @@ -241,9 +241,9 @@ public interface TimerInternals { public void encode(TimerData timer, OutputStream outStream, Context context) throws CoderException, IOException { Context nestedContext = context.nested(); - STRING_CODER.encode(timer.getTimerId(), outStream, nestedContext); - STRING_CODER.encode(timer.getNamespace().stringKey(), outStream, nestedContext); - INSTANT_CODER.encode(timer.getTimestamp(), outStream, nestedContext); + STRING_CODER.encode(timer.getTimerId(), outStream); + STRING_CODER.encode(timer.getNamespace().stringKey(), outStream); + INSTANT_CODER.encode(timer.getTimestamp(), outStream); STRING_CODER.encode(timer.getDomain().name(), outStream, context); } @@ -251,10 +251,10 @@ public interface TimerInternals { public TimerData decode(InputStream inStream, Context context) throws CoderException, IOException { Context nestedContext = context.nested(); - String timerId = STRING_CODER.decode(inStream, nestedContext); + String timerId = STRING_CODER.decode(inStream); StateNamespace namespace = - StateNamespaces.fromString(STRING_CODER.decode(inStream, nestedContext), windowCoder); - Instant timestamp = INSTANT_CODER.decode(inStream, nestedContext); + StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder); + Instant timestamp = INSTANT_CODER.decode(inStream); TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, context)); return TimerData.of(timerId, namespace, timestamp, domain); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java index e210ed9..e003119 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -77,14 +77,14 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> { @Override public void serialize(T t, DataOutputView dataOutputView) throws IOException { DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView); - coder.encode(t, outputWrapper, Coder.Context.NESTED); + coder.encode(t, outputWrapper); } @Override public T deserialize(DataInputView dataInputView) throws IOException { try { DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView); - return coder.decode(inputWrapper, Coder.Context.NESTED); + return coder.decode(inputWrapper); } catch (CoderException e) { Throwable cause = e.getCause(); if (cause instanceof EOFException) { http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java index f218693..d7bae7e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java @@ -70,14 +70,14 @@ public class SingletonKeyedWorkItemCoder<K, ElemT> OutputStream outStream, Context context) throws CoderException, IOException { - keyCoder.encode(value.key(), outStream, context.nested()); + keyCoder.encode(value.key(), outStream); valueCoder.encode(value.value, outStream, context); } @Override public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream, Context context) throws CoderException, IOException { - K key = keyCoder.decode(inStream, context.nested()); + K key = keyCoder.decode(inStream); WindowedValue<ElemT> value = valueCoder.decode(inStream, context); return new SingletonKeyedWorkItem<>(key, value); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java index d6af4f9..8d437d5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java @@ -430,8 +430,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals { Map<String, ?> map = entry.getValue().f1; out.writeInt(map.size()); for (Map.Entry<String, ?> entry1 : map.entrySet()) { - StringUtf8Coder.of().encode(entry1.getKey(), out, Context.NESTED); - coder.encode(entry1.getValue(), out, Context.NESTED); + StringUtf8Coder.of().encode(entry1.getKey(), out); + coder.encode(entry1.getValue(), out); } } } @@ -463,8 +463,8 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals { Map<String, Object> map = (Map<String, Object>) tuple2.f1; int mapSize = in.readInt(); for (int j = 0; j < mapSize; j++) { - String namespace = StringUtf8Coder.of().decode(in, Context.NESTED); - Object value = coder.decode(in, Context.NESTED); + String namespace = StringUtf8Coder.of().decode(in); + Object value = coder.decode(in); map.put(namespace, value); } } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index ecd0365..0e60fa0 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -1353,7 +1353,7 @@ class BatchViewOverrides { @Override public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { - transformCoder.encode(value.transform, outStream, context.nested()); + transformCoder.encode(value.transform, outStream); originalMapCoder.encode(value.originalMap, outStream, context); } @@ -1361,7 +1361,7 @@ class BatchViewOverrides { public TransformedMap<K, V1, V2> decode( InputStream inStream, Coder.Context context) throws CoderException, IOException { return new TransformedMap<>( - transformCoder.decode(inStream, context.nested()), + transformCoder.decode(inStream), originalMapCoder.decode(inStream, context)); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java index 00e0c54..0f0cd4d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java @@ -239,12 +239,12 @@ public class IsmFormat { keyComponentCoders.size(), value.getKeyComponents())); } for (int i = 0; i < keyComponentCoders.size(); ++i) { - getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream, context.nested()); + getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream); } if (isMetadataKey(value.getKeyComponents())) { - ByteArrayCoder.of().encode(value.getMetadata(), outStream, context.nested()); + ByteArrayCoder.of().encode(value.getMetadata(), outStream); } else { - valueCoder.encode(value.getValue(), outStream, context.nested()); + valueCoder.encode(value.getValue(), outStream); } } @@ -253,13 +253,13 @@ public class IsmFormat { throws CoderException, IOException { List<Object> keyComponents = new ArrayList<>(keyComponentCoders.size()); for (Coder<?> keyCoder : keyComponentCoders) { - keyComponents.add(keyCoder.decode(inStream, context.nested())); + keyComponents.add(keyCoder.decode(inStream)); } if (isMetadataKey(keyComponents)) { return IsmRecord.<V>meta( - keyComponents, ByteArrayCoder.of().decode(inStream, context.nested())); + keyComponents, ByteArrayCoder.of().decode(inStream)); } else { - return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream, context.nested())); + return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream)); } } @@ -499,7 +499,7 @@ public class IsmFormat { outStream.write(0); } else { outStream.write(1); - keyCoder.encode(value, outStream, context.nested()); + keyCoder.encode(value, outStream); } } @@ -510,7 +510,7 @@ public class IsmFormat { if (marker == 0) { return (K) getMetadataKey(); } else if (marker == 1) { - return keyCoder.decode(inStream, context.nested()); + return keyCoder.decode(inStream); } else { throw new CoderException(String.format("Expected marker but got %s.", marker)); } @@ -626,8 +626,8 @@ public class IsmFormat { checkState(value.getIndexOffset() >= 0, "%s attempting to be written without index offset.", value); - VarIntCoder.of().encode(value.getId(), outStream, context.nested()); - VarLongCoder.of().encode(value.getBlockOffset(), outStream, context.nested()); + VarIntCoder.of().encode(value.getId(), outStream); + VarLongCoder.of().encode(value.getBlockOffset(), outStream); VarLongCoder.of().encode(value.getIndexOffset(), outStream, context); } @@ -635,8 +635,8 @@ public class IsmFormat { public IsmShard decode( InputStream inStream, Coder.Context context) throws CoderException, IOException { return IsmShard.of( - VarIntCoder.of().decode(inStream, context.nested()), - VarLongCoder.of().decode(inStream, context.nested()), + VarIntCoder.of().decode(inStream), + VarLongCoder.of().decode(inStream), VarLongCoder.of().decode(inStream, context)); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java index c836ca5..27f2ec8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java @@ -207,7 +207,7 @@ public class NamedAggregators implements Serializable { oos.writeObject(inCoder); try { combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder) - .encode(state, oos, Coder.Context.NESTED); + .encode(state, oos); } catch (CannotProvideCoderException e) { throw new IllegalStateException("Could not determine coder for accumulator", e); } @@ -220,7 +220,7 @@ public class NamedAggregators implements Serializable { inCoder = (Coder<InputT>) ois.readObject(); try { state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder) - .decode(ois, Coder.Context.NESTED); + .decode(ois); } catch (CannotProvideCoderException e) { throw new IllegalStateException("Could not determine coder for accumulator", e); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java index 97559a9..e2166cf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java @@ -51,14 +51,14 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> { public void encode(BigDecimal value, OutputStream outStream, Context context) throws IOException, CoderException { checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName())); - VAR_INT_CODER.encode(value.scale(), outStream, context.nested()); + VAR_INT_CODER.encode(value.scale(), outStream); BIG_INT_CODER.encode(value.unscaledValue(), outStream, context); } @Override public BigDecimal decode(InputStream inStream, Context context) throws IOException, CoderException { - int scale = VAR_INT_CODER.decode(inStream, context.nested()); + int scale = VAR_INT_CODER.decode(inStream); BigInteger bigInteger = BIG_INT_CODER.decode(inStream, context); return new BigDecimal(bigInteger, scale); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java index 9994b3f..59d5424 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java @@ -97,7 +97,7 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> Collection<T> collection = (Collection<T>) iterable; dataOutStream.writeInt(collection.size()); for (T elem : collection) { - elementCoder.encode(elem, dataOutStream, nestedContext); + elementCoder.encode(elem, dataOutStream); } } else { // We don't know the size without traversing it so use a fixed size buffer @@ -108,7 +108,7 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> new BufferedElementCountingOutputStream(dataOutStream); for (T elem : iterable) { countingOutputStream.markElementStart(); - elementCoder.encode(elem, countingOutputStream, nestedContext); + elementCoder.encode(elem, countingOutputStream); } countingOutputStream.finish(); } @@ -125,7 +125,7 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> if (size >= 0) { List<T> elements = new ArrayList<>(size); for (int i = 0; i < size; i++) { - elements.add(elementCoder.decode(dataInStream, nestedContext)); + elements.add(elementCoder.decode(dataInStream)); } return decodeToIterable(elements); } @@ -134,7 +134,7 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>> // each block of elements. long count = VarInt.decodeLong(dataInStream); while (count > 0L) { - elements.add(elementCoder.decode(dataInStream, nestedContext)); + elements.add(elementCoder.decode(dataInStream)); --count; if (count == 0L) { count = VarInt.decodeLong(dataInStream); http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java index 1df4460..0bb53ec 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java @@ -63,14 +63,14 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> { if (kv == null) { throw new CoderException("cannot encode a null KV"); } - keyCoder.encode(kv.getKey(), outStream, context.nested()); + keyCoder.encode(kv.getKey(), outStream); valueCoder.encode(kv.getValue(), outStream, context); } @Override public KV<K, V> decode(InputStream inStream, Context context) throws IOException, CoderException { - K key = keyCoder.decode(inStream, context.nested()); + K key = keyCoder.decode(inStream); V value = valueCoder.decode(inStream, context); return KV.of(key, value); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java index 7df9ca9..f20eb93 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java @@ -89,12 +89,12 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> { Iterator<Entry<K, V>> iterator = map.entrySet().iterator(); Entry<K, V> entry = iterator.next(); while (iterator.hasNext()) { - keyCoder.encode(entry.getKey(), outStream, context.nested()); - valueCoder.encode(entry.getValue(), outStream, context.nested()); + keyCoder.encode(entry.getKey(), outStream); + valueCoder.encode(entry.getValue(), outStream); entry = iterator.next(); } - keyCoder.encode(entry.getKey(), outStream, context.nested()); + keyCoder.encode(entry.getKey(), outStream); valueCoder.encode(entry.getValue(), outStream, context); // no flush needed as DataOutputStream does not buffer } @@ -110,12 +110,12 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> { Map<K, V> retval = Maps.newHashMapWithExpectedSize(size); for (int i = 0; i < size - 1; ++i) { - K key = keyCoder.decode(inStream, context.nested()); - V value = valueCoder.decode(inStream, context.nested()); + K key = keyCoder.decode(inStream); + V value = valueCoder.decode(inStream); retval.put(key, value); } - K key = keyCoder.decode(inStream, context.nested()); + K key = keyCoder.decode(inStream); V value = valueCoder.decode(inStream, context); retval.put(key, value); return retval; http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 20fab9b..d8a98cd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -952,7 +952,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { if (value == null) { throw new CoderException("cannot encode a null value"); } - stringCoder.encode(value.getFilename().toString(), outStream, context.nested()); + stringCoder.encode(value.getFilename().toString(), outStream); if (value.getDestinationFilename() == null) { stringCoder.encode(null, outStream, context); } else { @@ -963,7 +963,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData { @Override public FileResult decode(InputStream inStream, Context context) throws IOException { - String filename = stringCoder.decode(inStream, context.nested()); + String filename = stringCoder.decode(inStream); assert filename != null; // fixes a compiler warning @Nullable String destinationFilename = stringCoder.decode(inStream, context); return new FileResult( http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java index 37d5a55..348cc5f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java @@ -682,10 +682,10 @@ public class ApproximateQuantiles { QuantileState<T, ComparatorT> state, OutputStream outStream, Coder.Context context) throws CoderException, IOException { Coder.Context nestedContext = context.nested(); - intCoder.encode(state.numQuantiles, outStream, nestedContext); - intCoder.encode(state.bufferSize, outStream, nestedContext); - elementCoder.encode(state.min, outStream, nestedContext); - elementCoder.encode(state.max, outStream, nestedContext); + intCoder.encode(state.numQuantiles, outStream); + intCoder.encode(state.bufferSize, outStream); + elementCoder.encode(state.min, outStream); + elementCoder.encode(state.max, outStream); elementListCoder.encode( state.unbufferedElements, outStream, nestedContext); BigEndianIntegerCoder.of().encode( @@ -699,14 +699,14 @@ public class ApproximateQuantiles { public QuantileState<T, ComparatorT> decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { Coder.Context nestedContext = context.nested(); - int numQuantiles = intCoder.decode(inStream, nestedContext); - int bufferSize = intCoder.decode(inStream, nestedContext); - T min = elementCoder.decode(inStream, nestedContext); - T max = elementCoder.decode(inStream, nestedContext); + int numQuantiles = intCoder.decode(inStream); + int bufferSize = intCoder.decode(inStream); + T min = elementCoder.decode(inStream); + T max = elementCoder.decode(inStream); List<T> unbufferedElements = - elementListCoder.decode(inStream, nestedContext); + elementListCoder.decode(inStream); int numBuffers = - BigEndianIntegerCoder.of().decode(inStream, nestedContext); + BigEndianIntegerCoder.of().decode(inStream); List<QuantileBuffer<T>> buffers = new ArrayList<>(numBuffers); for (int i = 0; i < numBuffers; i++) { buffers.add(decodeBuffer(inStream, nestedContext)); http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index 0515ed5..c45df04 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -543,7 +543,7 @@ public class CombineFns { int lastIndex = codersCount - 1; Context nestedContext = context.nested(); for (int i = 0; i < lastIndex; ++i) { - coders.get(i).encode(value[i], outStream, nestedContext); + coders.get(i).encode(value[i], outStream); } coders.get(lastIndex).encode(value[lastIndex], outStream, context); } @@ -558,7 +558,7 @@ public class CombineFns { int lastIndex = codersCount - 1; Context nestedContext = context.nested(); for (int i = 0; i < lastIndex; ++i) { - ret[i] = coders.get(i).decode(inStream, nestedContext); + ret[i] = coders.get(i).decode(inStream); } ret[lastIndex] = coders.get(lastIndex).decode(inStream, context); return ret; http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java index a309954..a46a21f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java @@ -187,7 +187,7 @@ public class Mean { @Override public void encode(CountSum<NumT> value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { - LONG_CODER.encode(value.count, outStream, context.nested()); + LONG_CODER.encode(value.count, outStream); DOUBLE_CODER.encode(value.sum, outStream, context); } @@ -195,7 +195,7 @@ public class Mean { public CountSum<NumT> decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { return new CountSum<>( - LONG_CODER.decode(inStream, context.nested()), + LONG_CODER.decode(inStream), DOUBLE_CODER.decode(inStream, context)); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java index e9a3571..bd669ef 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java @@ -253,7 +253,7 @@ public class CoGbkResult { } int lastIndex = schema.size() - 1; for (int unionTag = 0; unionTag < lastIndex; unionTag++) { - tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, context.nested()); + tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream); } tagListCoder(lastIndex).encode(value.valueMap.get(lastIndex), outStream, context); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java index 46ece09..cb5a7cf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java @@ -181,14 +181,14 @@ public class IntervalWindow extends BoundedWindow @Override public void encode(IntervalWindow window, OutputStream outStream, Context context) throws IOException, CoderException { - instantCoder.encode(window.end, outStream, context.nested()); + instantCoder.encode(window.end, outStream); durationCoder.encode(new Duration(window.start, window.end), outStream, context); } @Override public IntervalWindow decode(InputStream inStream, Context context) throws IOException, CoderException { - Instant end = instantCoder.decode(inStream, context.nested()); + Instant end = instantCoder.decode(inStream); ReadableDuration duration = durationCoder.decode(inStream, context); return new IntervalWindow(end.minus(duration), end); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index 1b7e335..e3e61cf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -643,8 +643,8 @@ public abstract class WindowedValue<T> { Context nestedContext = context.nested(); InstantCoder.of().encode( windowedElem.getTimestamp(), outStream, nestedContext); - windowsCoder.encode(windowedElem.getWindows(), outStream, nestedContext); - PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, nestedContext); + windowsCoder.encode(windowedElem.getWindows(), outStream); + PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream); valueCoder.encode(windowedElem.getValue(), outStream, context); } @@ -652,10 +652,10 @@ public abstract class WindowedValue<T> { public WindowedValue<T> decode(InputStream inStream, Context context) throws CoderException, IOException { Context nestedContext = context.nested(); - Instant timestamp = InstantCoder.of().decode(inStream, nestedContext); + Instant timestamp = InstantCoder.of().decode(inStream); Collection<? extends BoundedWindow> windows = - windowsCoder.decode(inStream, nestedContext); - PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream, nestedContext); + windowsCoder.decode(inStream); + PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream); T value = valueCoder.decode(inStream, context); return WindowedValue.of(value, timestamp, windows, pane); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java index c172885..89747a7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java @@ -104,7 +104,7 @@ public class TimestampedValue<V> { OutputStream outStream, Context context) throws IOException { - valueCoder.encode(windowedElem.getValue(), outStream, context.nested()); + valueCoder.encode(windowedElem.getValue(), outStream); InstantCoder.of().encode( windowedElem.getTimestamp(), outStream, context); } @@ -112,7 +112,7 @@ public class TimestampedValue<V> { @Override public TimestampedValue<T> decode(InputStream inStream, Context context) throws IOException { - T value = valueCoder.decode(inStream, context.nested()); + T value = valueCoder.decode(inStream); Instant timestamp = InstantCoder.of().decode(inStream, context); return TimestampedValue.of(value, timestamp); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java index 3ecbaa2..e8a2dfd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java @@ -78,18 +78,18 @@ public abstract class ValueInSingleWindow<T> { public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream, Context context) throws IOException { Context nestedContext = context.nested(); - InstantCoder.of().encode(windowedElem.getTimestamp(), outStream, nestedContext); - windowCoder.encode(windowedElem.getWindow(), outStream, nestedContext); - PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, nestedContext); + InstantCoder.of().encode(windowedElem.getTimestamp(), outStream); + windowCoder.encode(windowedElem.getWindow(), outStream); + PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream); valueCoder.encode(windowedElem.getValue(), outStream, context); } @Override public ValueInSingleWindow<T> decode(InputStream inStream, Context context) throws IOException { Context nestedContext = context.nested(); - Instant timestamp = InstantCoder.of().decode(inStream, nestedContext); - BoundedWindow window = windowCoder.decode(inStream, nestedContext); - PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, nestedContext); + Instant timestamp = InstantCoder.of().decode(inStream); + BoundedWindow window = windowCoder.decode(inStream); + PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream); T value = valueCoder.decode(inStream, context); return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, pane); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java index 3f057e1..f06317b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java @@ -103,7 +103,7 @@ public class ValueWithRecordId<ValueT> { @Override public void encode(ValueWithRecordId<ValueT> value, OutputStream outStream, Context context) throws IOException { - valueCoder.encode(value.value, outStream, context.nested()); + valueCoder.encode(value.value, outStream); idCoder.encode(value.id, outStream, context); } @@ -111,7 +111,7 @@ public class ValueWithRecordId<ValueT> { public ValueWithRecordId<ValueT> decode(InputStream inStream, Context context) throws IOException { return new ValueWithRecordId<ValueT>( - valueCoder.decode(inStream, context.nested()), + valueCoder.decode(inStream), idCoder.decode(inStream, context)); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java index d97eea6..adb6652 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java @@ -182,15 +182,15 @@ public class SerializableCoderTest implements Serializable { // Encode both strings into NESTED form. byte[] nestedEncoding; try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { - coder.encode(source, os, Coder.Context.NESTED); - coder.encode(source2, os, Coder.Context.NESTED); + coder.encode(source, os); + coder.encode(source2, os); nestedEncoding = os.toByteArray(); } // Decode from NESTED form. try (ByteArrayInputStream is = new ByteArrayInputStream(nestedEncoding)) { - assertEquals(source, coder.decode(is, Coder.Context.NESTED)); - assertEquals(source2, coder.decode(is, Coder.Context.NESTED)); + assertEquals(source, coder.decode(is)); + assertEquals(source2, coder.decode(is)); assertEquals(0, is.available()); } } @@ -207,20 +207,20 @@ public class SerializableCoderTest implements Serializable { Coder<String> coder = SerializableCoder.of(String.class); byte[] encodedBytes; try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { - coder.encode(null, os, Coder.Context.NESTED); - coder.encode("TestValue", os, Coder.Context.NESTED); - coder.encode(null, os, Coder.Context.NESTED); - coder.encode("TestValue2", os, Coder.Context.NESTED); - coder.encode(null, os, Coder.Context.NESTED); + coder.encode(null, os); + coder.encode("TestValue", os); + coder.encode(null, os); + coder.encode("TestValue2", os); + coder.encode(null, os); encodedBytes = os.toByteArray(); } try (ByteArrayInputStream is = new ByteArrayInputStream(encodedBytes)) { - assertNull(coder.decode(is, Coder.Context.NESTED)); - assertEquals("TestValue", coder.decode(is, Coder.Context.NESTED)); - assertNull(coder.decode(is, Coder.Context.NESTED)); - assertEquals("TestValue2", coder.decode(is, Coder.Context.NESTED)); - assertNull(coder.decode(is, Coder.Context.NESTED)); + assertNull(coder.decode(is)); + assertEquals("TestValue", coder.decode(is)); + assertNull(coder.decode(is)); + assertEquals("TestValue2", coder.decode(is)); + assertNull(coder.decode(is)); assertEquals(0, is.available()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index a70af94..e4b016b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -878,14 +878,14 @@ public class CombineTest implements Serializable { @Override public void encode(CountSum value, OutputStream outStream, Context context) throws CoderException, IOException { - LONG_CODER.encode(value.count, outStream, context.nested()); + LONG_CODER.encode(value.count, outStream); DOUBLE_CODER.encode(value.sum, outStream, context); } @Override public CountSum decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { - long count = LONG_CODER.decode(inStream, context.nested()); + long count = LONG_CODER.decode(inStream); double sum = DOUBLE_CODER.decode(inStream, context); return new CountSum(count, sum); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java index a458812..7e8a1dd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java @@ -210,14 +210,14 @@ public class CreateTest { OutputStream outStream, org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { - stringCoder.encode(value.myString, outStream, context.nested()); + stringCoder.encode(value.myString, outStream); } @Override public UnserializableRecord decode( InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { - return new UnserializableRecord(stringCoder.decode(inStream, context.nested())); + return new UnserializableRecord(stringCoder.decode(inStream)); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java index 314b969..9ae5d68 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java @@ -35,7 +35,7 @@ public class GlobalWindowTest { CountingOutputStream out = new CountingOutputStream(ByteStreams.nullOutputStream()); GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out, Context.OUTER); assertEquals(0, out.getCount()); - GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out, Context.NESTED); + GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out); assertEquals(0, out.getCount()); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java index 36f7028..894d8a9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java @@ -32,7 +32,6 @@ import java.util.Collections; import java.util.List; import java.util.Random; import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.coders.Coder.Context; import org.hamcrest.collection.IsIterableContainingInOrder; import org.junit.Rule; import org.junit.Test; @@ -180,7 +179,7 @@ public class BufferedElementCountingOutputStreamTest { do { count = VarInt.decodeLong(is); for (int i = 0; i < count; ++i) { - values.add(ByteArrayCoder.of().decode(is, Context.NESTED)); + values.add(ByteArrayCoder.of().decode(is)); } } while(count > 0); @@ -198,7 +197,7 @@ public class BufferedElementCountingOutputStreamTest { for (byte[] value : values) { os.markElementStart(); - ByteArrayCoder.of().encode(value, os, Context.NESTED); + ByteArrayCoder.of().encode(value, os); } return os; } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java index 18e0d95..37745be 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java @@ -110,7 +110,7 @@ public class BeamFnDataBufferingOutboundObserver<T> @Override public void accept(WindowedValue<T> t) throws IOException { - coder.encode(t, bufferedElements, Context.NESTED); + coder.encode(t, bufferedElements); counter += 1; if (bufferedElements.size() >= bufferLimit) { outboundObserver.onNext(convertBufferForTransmission().build()); http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java index 24365d8..ece87d2 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java @@ -71,7 +71,7 @@ public class BeamFnDataInboundObserver<T> implements Consumer<BeamFnApi.Elements InputStream inputStream = t.getData().newInput(); while (inputStream.available() > 0) { counter += 1; - WindowedValue<T> value = coder.decode(inputStream, Context.NESTED); + WindowedValue<T> value = coder.decode(inputStream); consumer.accept(value); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java index 7cbf8eb..3f6ece7 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java @@ -135,7 +135,7 @@ public class BeamFnDataBufferingOutboundObserverTest { private static BeamFnApi.Elements messageWithData(byte[] ... datum) throws IOException { ByteString.Output output = ByteString.newOutput(); for (byte[] data : datum) { - CODER.encode(valueInGlobalWindow(data), output, Context.NESTED); + CODER.encode(valueInGlobalWindow(data), output); } return BeamFnApi.Elements.newBuilder() .addData(BeamFnApi.Elements.Data.newBuilder() http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java index c53f99d..4b0bf0c 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java @@ -108,7 +108,7 @@ public class BeamFnDataInboundObserverTest { .setName("Test")); ByteString.Output output = ByteString.newOutput(); for (String value : values) { - CODER.encode(valueInGlobalWindow(value), output, Context.NESTED); + CODER.encode(valueInGlobalWindow(value), output); } builder.setData(output.toByteString()); return builder.build(); http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java index 7aefcfa..c2b62b7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java @@ -53,17 +53,18 @@ class ShardedKeyCoder<KeyT> } @Override - public void encode(ShardedKey<KeyT> key, OutputStream outStream, Context context) + public void encode(ShardedKey<KeyT> key, OutputStream outStream) throws IOException { - keyCoder.encode(key.getKey(), outStream, context.nested()); - shardNumberCoder.encode(key.getShardNumber(), outStream, context); + keyCoder.encode(key.getKey(), outStream); + shardNumberCoder.encode(key.getShardNumber(), outStream); } @Override - public ShardedKey<KeyT> decode(InputStream inStream, Context context) + public ShardedKey<KeyT> decode(InputStream inStream) throws IOException { return new ShardedKey<>( - keyCoder.decode(inStream, context.nested()), shardNumberCoder.decode(inStream, context)); + keyCoder.decode(inStream), + shardNumberCoder.decode(inStream)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java index 01bc558..33b9f77 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java @@ -43,14 +43,14 @@ public class TableDestinationCoder extends AtomicCoder<TableDestination> { if (value == null) { throw new CoderException("cannot encode a null value"); } - tableSpecCoder.encode(value.getTableSpec(), outStream, context.nested()); - tableDescriptionCoder.encode(value.getTableDescription(), outStream, context); + tableSpecCoder.encode(value.getTableSpec(), outStream); + tableDescriptionCoder.encode(value.getTableDescription(), outStream); } @Override - public TableDestination decode(InputStream inStream, Context context) throws IOException { - String tableSpec = tableSpecCoder.decode(inStream, context.nested()); - String tableDescription = tableDescriptionCoder.decode(inStream, context); + public TableDestination decode(InputStream inStream) throws IOException { + String tableSpec = tableSpecCoder.decode(inStream); + String tableDescription = tableDescriptionCoder.decode(inStream); return new TableDestination(tableSpec, tableDescription); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java index 2b1988a..8ae75c5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java @@ -43,7 +43,7 @@ class TableRowInfoCoder extends AtomicCoder<TableRowInfo> { if (value == null) { throw new CoderException("cannot encode a null value"); } - tableRowCoder.encode(value.tableRow, outStream, context.nested()); + tableRowCoder.encode(value.tableRow, outStream); idCoder.encode(value.uniqueId, outStream, context); } @@ -51,7 +51,7 @@ class TableRowInfoCoder extends AtomicCoder<TableRowInfo> { public TableRowInfo decode(InputStream inStream, Context context) throws IOException { return new TableRowInfo( - tableRowCoder.decode(inStream, context.nested()), + tableRowCoder.decode(inStream), idCoder.decode(inStream, context)); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index 890979b..9e83271 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -106,16 +106,16 @@ class WriteBundlesToFiles<DestinationT> if (value == null) { throw new CoderException("cannot encode a null value"); } - stringCoder.encode(value.filename, outStream, context.nested()); - longCoder.encode(value.fileByteSize, outStream, context.nested()); - destinationCoder.encode(value.destination, outStream, context.nested()); + stringCoder.encode(value.filename, outStream); + longCoder.encode(value.fileByteSize, outStream); + destinationCoder.encode(value.destination, outStream); } @Override public Result<DestinationT> decode(InputStream inStream, Context context) throws IOException { - String filename = stringCoder.decode(inStream, context.nested()); - long fileByteSize = longCoder.decode(inStream, context.nested()); - DestinationT destination = destinationCoder.decode(inStream, context.nested()); + String filename = stringCoder.decode(inStream); + long fileByteSize = longCoder.decode(inStream); + DestinationT destination = destinationCoder.decode(inStream); return new Result<>(filename, fileByteSize, destination); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java index e061edc..5907c9e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java @@ -47,13 +47,13 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage> public void encode(PubsubMessage value, OutputStream outStream, Context context) throws IOException { - PAYLOAD_CODER.encode(value.getPayload(), outStream, context.nested()); + PAYLOAD_CODER.encode(value.getPayload(), outStream); ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream, context); } @Override public PubsubMessage decode(InputStream inStream, Context context) throws IOException { - byte[] payload = PAYLOAD_CODER.decode(inStream, context.nested()); + byte[] payload = PAYLOAD_CODER.decode(inStream); Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context); return new PubsubMessage(payload, attributes); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index 9f04a6c..ae320c7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -110,19 +110,19 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, public void encode( OutgoingMessage value, OutputStream outStream, Context context) throws CoderException, IOException { - ByteArrayCoder.of().encode(value.elementBytes, outStream, context.nested()); - ATTRIBUTES_CODER.encode(value.attributes, outStream, context.nested()); - BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, context.nested()); - RECORD_ID_CODER.encode(value.recordId, outStream, context.nested()); + ByteArrayCoder.of().encode(value.elementBytes, outStream); + ATTRIBUTES_CODER.encode(value.attributes, outStream); + BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream); + RECORD_ID_CODER.encode(value.recordId, outStream); } @Override public OutgoingMessage decode( InputStream inStream, Context context) throws CoderException, IOException { - byte[] elementBytes = ByteArrayCoder.of().decode(inStream, context.nested()); - Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context.nested()); - long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, context.nested()); - @Nullable String recordId = RECORD_ID_CODER.decode(inStream, context.nested()); + byte[] elementBytes = ByteArrayCoder.of().decode(inStream); + Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream); + long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream); + @Nullable String recordId = RECORD_ID_CODER.decode(inStream); return new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId); } } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index c16b8fb..e53976e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -380,7 +380,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub @Override public PubsubCheckpoint decode(InputStream inStream, Context context) throws IOException { - String path = SUBSCRIPTION_PATH_CODER.decode(inStream, context.nested()); + String path = SUBSCRIPTION_PATH_CODER.decode(inStream); List<String> notYetReadIds = LIST_CODER.decode(inStream, context); return new PubsubCheckpoint(path, null, null, notYetReadIds); } http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java index 2b4503a..5386a61 100644 --- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java @@ -181,8 +181,8 @@ public class JAXBCoderTest { public void encode(TestType value, OutputStream outStream, Context context) throws CoderException, IOException { Context nestedContext = context.nested(); - VarIntCoder.of().encode(3, outStream, nestedContext); - jaxbCoder.encode(value, outStream, nestedContext); + VarIntCoder.of().encode(3, outStream); + jaxbCoder.encode(value, outStream); VarLongCoder.of().encode(22L, outStream, context); } @@ -190,8 +190,8 @@ public class JAXBCoderTest { public TestType decode(InputStream inStream, Context context) throws CoderException, IOException { Context nestedContext = context.nested(); - VarIntCoder.of().decode(inStream, nestedContext); - TestType result = jaxbCoder.decode(inStream, nestedContext); + VarIntCoder.of().decode(inStream); + TestType result = jaxbCoder.decode(inStream); VarLongCoder.of().decode(inStream, context); return result; }