Repository: beam
Updated Branches:
  refs/heads/master 23731fe7a -> 3a09ed575


Remove explicit used of nested contexts.

    find . -type f -name '*.java' | xargs sed -i '' 's/\([.]..code[(].*\),  
*context.nested..[)]/\1)/'
    find . -type f -name '*.java' | xargs sed -i '' 's/\([.]..code[(].*\),  
*nestedContext[)]/\1)/'
    find . -type f -name '*.java' | xargs sed -i '' 's/\([.]..code[(].*\),  
*Context.NESTED[)]/\1)/'
    find . -type f -name '*.java' | xargs sed -i '' 's/\([.]..code[(].*\),  *[^ 
]*.Context.NESTED[)]/\1)/'

Added back explicit context in CoGbkResult.java due to compile error.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/27e9a060
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/27e9a060
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/27e9a060

Branch: refs/heads/master
Commit: 27e9a060ed593b2b53b88481591f14b1a274c61b
Parents: 23731fe
Author: Robert Bradshaw <rober...@gmail.com>
Authored: Fri May 5 16:20:37 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon May 8 20:17:54 2017 -0700

----------------------------------------------------------------------
 .../UnboundedReadFromBoundedSource.java         |  4 +--
 .../core/ElementAndRestrictionCoder.java        |  4 +--
 .../beam/runners/core/KeyedWorkItemCoder.java   |  8 +++---
 .../beam/runners/core/TimerInternals.java       | 12 ++++-----
 .../translation/types/CoderTypeSerializer.java  |  4 +--
 .../streaming/SingletonKeyedWorkItemCoder.java  |  4 +--
 .../state/FlinkKeyGroupStateInternals.java      |  8 +++---
 .../runners/dataflow/BatchViewOverrides.java    |  4 +--
 .../runners/dataflow/internal/IsmFormat.java    | 24 ++++++++---------
 .../spark/aggregators/NamedAggregators.java     |  4 +--
 .../apache/beam/sdk/coders/BigDecimalCoder.java |  4 +--
 .../beam/sdk/coders/IterableLikeCoder.java      |  8 +++---
 .../org/apache/beam/sdk/coders/KvCoder.java     |  4 +--
 .../org/apache/beam/sdk/coders/MapCoder.java    | 12 ++++-----
 .../org/apache/beam/sdk/io/FileBasedSink.java   |  4 +--
 .../sdk/transforms/ApproximateQuantiles.java    | 20 +++++++-------
 .../apache/beam/sdk/transforms/CombineFns.java  |  4 +--
 .../org/apache/beam/sdk/transforms/Mean.java    |  4 +--
 .../beam/sdk/transforms/join/CoGbkResult.java   |  2 +-
 .../transforms/windowing/IntervalWindow.java    |  4 +--
 .../org/apache/beam/sdk/util/WindowedValue.java | 10 +++----
 .../beam/sdk/values/TimestampedValue.java       |  4 +--
 .../beam/sdk/values/ValueInSingleWindow.java    | 12 ++++-----
 .../beam/sdk/values/ValueWithRecordId.java      |  4 +--
 .../beam/sdk/coders/SerializableCoderTest.java  | 28 ++++++++++----------
 .../apache/beam/sdk/transforms/CombineTest.java |  4 +--
 .../apache/beam/sdk/transforms/CreateTest.java  |  4 +--
 .../transforms/windowing/GlobalWindowTest.java  |  2 +-
 ...BufferedElementCountingOutputStreamTest.java |  5 ++--
 .../BeamFnDataBufferingOutboundObserver.java    |  2 +-
 .../harness/data/BeamFnDataInboundObserver.java |  2 +-
 ...BeamFnDataBufferingOutboundObserverTest.java |  2 +-
 .../data/BeamFnDataInboundObserverTest.java     |  2 +-
 .../sdk/io/gcp/bigquery/ShardedKeyCoder.java    | 11 ++++----
 .../io/gcp/bigquery/TableDestinationCoder.java  | 10 +++----
 .../sdk/io/gcp/bigquery/TableRowInfoCoder.java  |  4 +--
 .../io/gcp/bigquery/WriteBundlesToFiles.java    | 12 ++++-----
 .../PubsubMessageWithAttributesCoder.java       |  4 +--
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  | 16 +++++------
 .../io/gcp/pubsub/PubsubUnboundedSource.java    |  2 +-
 .../apache/beam/sdk/io/xml/JAXBCoderTest.java   |  8 +++---
 41 files changed, 145 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 1424b8b..ae28e3a 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -223,7 +223,7 @@ public class UnboundedReadFromBoundedSource<T> extends 
PTransform<PBegin, PColle
       @Override
       public void encode(Checkpoint<T> value, OutputStream outStream, Context 
context)
           throws CoderException, IOException {
-        elemsCoder.encode(value.residualElements, outStream, context.nested());
+        elemsCoder.encode(value.residualElements, outStream);
         sourceCoder.encode(value.residualSource, outStream, context);
       }
 
@@ -232,7 +232,7 @@ public class UnboundedReadFromBoundedSource<T> extends 
PTransform<PBegin, PColle
       public Checkpoint<T> decode(InputStream inStream, Context context)
           throws CoderException, IOException {
         return new Checkpoint<>(
-            elemsCoder.decode(inStream, context.nested()),
+            elemsCoder.decode(inStream),
             sourceCoder.decode(inStream, context));
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
index 83c4e62..5ddd865 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
@@ -55,14 +55,14 @@ public class ElementAndRestrictionCoder<ElementT, 
RestrictionT>
     if (value == null) {
       throw new CoderException("cannot encode a null ElementAndRestriction");
     }
-    elementCoder.encode(value.element(), outStream, context.nested());
+    elementCoder.encode(value.element(), outStream);
     restrictionCoder.encode(value.restriction(), outStream, context);
   }
 
   @Override
   public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream 
inStream, Context context)
       throws IOException {
-    ElementT key = elementCoder.decode(inStream, context.nested());
+    ElementT key = elementCoder.decode(inStream);
     RestrictionT value = restrictionCoder.decode(inStream, context);
     return ElementAndRestriction.of(key, value);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
index e1872b5..ac8a34c 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -72,8 +72,8 @@ public class KeyedWorkItemCoder<K, ElemT> extends 
StructuredCoder<KeyedWorkItem<
   public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, 
Coder.Context context)
       throws CoderException, IOException {
     Coder.Context nestedContext = context.nested();
-    keyCoder.encode(value.key(), outStream, nestedContext);
-    timersCoder.encode(value.timersIterable(), outStream, nestedContext);
+    keyCoder.encode(value.key(), outStream);
+    timersCoder.encode(value.timersIterable(), outStream);
     elemsCoder.encode(value.elementsIterable(), outStream, context);
   }
 
@@ -81,8 +81,8 @@ public class KeyedWorkItemCoder<K, ElemT> extends 
StructuredCoder<KeyedWorkItem<
   public KeyedWorkItem<K, ElemT> decode(InputStream inStream, Coder.Context 
context)
       throws CoderException, IOException {
     Coder.Context nestedContext = context.nested();
-    K key = keyCoder.decode(inStream, nestedContext);
-    Iterable<TimerData> timers = timersCoder.decode(inStream, nestedContext);
+    K key = keyCoder.decode(inStream);
+    Iterable<TimerData> timers = timersCoder.decode(inStream);
     Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, 
context);
     return KeyedWorkItems.workItem(key, timers, elems);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index 888c11f..3607fdd 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -241,9 +241,9 @@ public interface TimerInternals {
     public void encode(TimerData timer, OutputStream outStream, Context 
context)
         throws CoderException, IOException {
       Context nestedContext = context.nested();
-      STRING_CODER.encode(timer.getTimerId(), outStream, nestedContext);
-      STRING_CODER.encode(timer.getNamespace().stringKey(), outStream, 
nestedContext);
-      INSTANT_CODER.encode(timer.getTimestamp(), outStream, nestedContext);
+      STRING_CODER.encode(timer.getTimerId(), outStream);
+      STRING_CODER.encode(timer.getNamespace().stringKey(), outStream);
+      INSTANT_CODER.encode(timer.getTimestamp(), outStream);
       STRING_CODER.encode(timer.getDomain().name(), outStream, context);
     }
 
@@ -251,10 +251,10 @@ public interface TimerInternals {
     public TimerData decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       Context nestedContext = context.nested();
-      String timerId = STRING_CODER.decode(inStream, nestedContext);
+      String timerId = STRING_CODER.decode(inStream);
       StateNamespace namespace =
-          StateNamespaces.fromString(STRING_CODER.decode(inStream, 
nestedContext), windowCoder);
-      Instant timestamp = INSTANT_CODER.decode(inStream, nestedContext);
+          StateNamespaces.fromString(STRING_CODER.decode(inStream), 
windowCoder);
+      Instant timestamp = INSTANT_CODER.decode(inStream);
       TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, 
context));
       return TimerData.of(timerId, namespace, timestamp, domain);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
index e210ed9..e003119 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -77,14 +77,14 @@ public class CoderTypeSerializer<T> extends 
TypeSerializer<T> {
   @Override
   public void serialize(T t, DataOutputView dataOutputView) throws IOException 
{
     DataOutputViewWrapper outputWrapper = new 
DataOutputViewWrapper(dataOutputView);
-    coder.encode(t, outputWrapper, Coder.Context.NESTED);
+    coder.encode(t, outputWrapper);
   }
 
   @Override
   public T deserialize(DataInputView dataInputView) throws IOException {
     try {
       DataInputViewWrapper inputWrapper = new 
DataInputViewWrapper(dataInputView);
-      return coder.decode(inputWrapper, Coder.Context.NESTED);
+      return coder.decode(inputWrapper);
     } catch (CoderException e) {
       Throwable cause = e.getCause();
       if (cause instanceof EOFException) {

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index f218693..d7bae7e 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -70,14 +70,14 @@ public class SingletonKeyedWorkItemCoder<K, ElemT>
                      OutputStream outStream,
                      Context context)
       throws CoderException, IOException {
-    keyCoder.encode(value.key(), outStream, context.nested());
+    keyCoder.encode(value.key(), outStream);
     valueCoder.encode(value.value, outStream, context);
   }
 
   @Override
   public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream, Context 
context)
       throws CoderException, IOException {
-    K key = keyCoder.decode(inStream, context.nested());
+    K key = keyCoder.decode(inStream);
     WindowedValue<ElemT> value = valueCoder.decode(inStream, context);
     return new SingletonKeyedWorkItem<>(key, value);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
index d6af4f9..8d437d5 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -430,8 +430,8 @@ public class FlinkKeyGroupStateInternals<K> implements 
StateInternals {
       Map<String, ?> map = entry.getValue().f1;
       out.writeInt(map.size());
       for (Map.Entry<String, ?> entry1 : map.entrySet()) {
-        StringUtf8Coder.of().encode(entry1.getKey(), out, Context.NESTED);
-        coder.encode(entry1.getValue(), out, Context.NESTED);
+        StringUtf8Coder.of().encode(entry1.getKey(), out);
+        coder.encode(entry1.getValue(), out);
       }
     }
   }
@@ -463,8 +463,8 @@ public class FlinkKeyGroupStateInternals<K> implements 
StateInternals {
       Map<String, Object> map = (Map<String, Object>) tuple2.f1;
       int mapSize = in.readInt();
       for (int j = 0; j < mapSize; j++) {
-        String namespace = StringUtf8Coder.of().decode(in, Context.NESTED);
-        Object value = coder.decode(in, Context.NESTED);
+        String namespace = StringUtf8Coder.of().decode(in);
+        Object value = coder.decode(in);
         map.put(namespace, value);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index ecd0365..0e60fa0 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -1353,7 +1353,7 @@ class BatchViewOverrides {
     @Override
     public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream,
         Coder.Context context) throws CoderException, IOException {
-      transformCoder.encode(value.transform, outStream, context.nested());
+      transformCoder.encode(value.transform, outStream);
       originalMapCoder.encode(value.originalMap, outStream, context);
     }
 
@@ -1361,7 +1361,7 @@ class BatchViewOverrides {
     public TransformedMap<K, V1, V2> decode(
         InputStream inStream, Coder.Context context) throws CoderException, 
IOException {
       return new TransformedMap<>(
-          transformCoder.decode(inStream, context.nested()),
+          transformCoder.decode(inStream),
           originalMapCoder.decode(inStream, context));
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index 00e0c54..0f0cd4d 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -239,12 +239,12 @@ public class IsmFormat {
             keyComponentCoders.size(), value.getKeyComponents()));
       }
       for (int i = 0; i < keyComponentCoders.size(); ++i) {
-        getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream, 
context.nested());
+        getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream);
       }
       if (isMetadataKey(value.getKeyComponents())) {
-        ByteArrayCoder.of().encode(value.getMetadata(), outStream, 
context.nested());
+        ByteArrayCoder.of().encode(value.getMetadata(), outStream);
       } else {
-        valueCoder.encode(value.getValue(), outStream, context.nested());
+        valueCoder.encode(value.getValue(), outStream);
       }
     }
 
@@ -253,13 +253,13 @@ public class IsmFormat {
         throws CoderException, IOException {
       List<Object> keyComponents = new ArrayList<>(keyComponentCoders.size());
       for (Coder<?> keyCoder : keyComponentCoders) {
-        keyComponents.add(keyCoder.decode(inStream, context.nested()));
+        keyComponents.add(keyCoder.decode(inStream));
       }
       if (isMetadataKey(keyComponents)) {
         return IsmRecord.<V>meta(
-            keyComponents, ByteArrayCoder.of().decode(inStream, 
context.nested()));
+            keyComponents, ByteArrayCoder.of().decode(inStream));
       } else {
-        return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream, 
context.nested()));
+        return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream));
       }
     }
 
@@ -499,7 +499,7 @@ public class IsmFormat {
         outStream.write(0);
       } else {
         outStream.write(1);
-        keyCoder.encode(value, outStream, context.nested());
+        keyCoder.encode(value, outStream);
       }
     }
 
@@ -510,7 +510,7 @@ public class IsmFormat {
       if (marker == 0) {
         return (K) getMetadataKey();
       } else if (marker == 1) {
-        return keyCoder.decode(inStream, context.nested());
+        return keyCoder.decode(inStream);
       } else {
         throw new CoderException(String.format("Expected marker but got %s.", 
marker));
       }
@@ -626,8 +626,8 @@ public class IsmFormat {
       checkState(value.getIndexOffset() >= 0,
           "%s attempting to be written without index offset.",
           value);
-      VarIntCoder.of().encode(value.getId(), outStream, context.nested());
-      VarLongCoder.of().encode(value.getBlockOffset(), outStream, 
context.nested());
+      VarIntCoder.of().encode(value.getId(), outStream);
+      VarLongCoder.of().encode(value.getBlockOffset(), outStream);
       VarLongCoder.of().encode(value.getIndexOffset(), outStream, context);
     }
 
@@ -635,8 +635,8 @@ public class IsmFormat {
     public IsmShard decode(
         InputStream inStream, Coder.Context context) throws CoderException, 
IOException {
       return IsmShard.of(
-          VarIntCoder.of().decode(inStream, context.nested()),
-          VarLongCoder.of().decode(inStream, context.nested()),
+          VarIntCoder.of().decode(inStream),
+          VarLongCoder.of().decode(inStream),
           VarLongCoder.of().decode(inStream, context));
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index c836ca5..27f2ec8 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -207,7 +207,7 @@ public class NamedAggregators implements Serializable {
       oos.writeObject(inCoder);
       try {
         combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
-            .encode(state, oos, Coder.Context.NESTED);
+            .encode(state, oos);
       } catch (CannotProvideCoderException e) {
         throw new IllegalStateException("Could not determine coder for 
accumulator", e);
       }
@@ -220,7 +220,7 @@ public class NamedAggregators implements Serializable {
       inCoder = (Coder<InputT>) ois.readObject();
       try {
         state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
-            .decode(ois, Coder.Context.NESTED);
+            .decode(ois);
       } catch (CannotProvideCoderException e) {
         throw new IllegalStateException("Could not determine coder for 
accumulator", e);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
index 97559a9..e2166cf 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
@@ -51,14 +51,14 @@ public class BigDecimalCoder extends 
AtomicCoder<BigDecimal> {
   public void encode(BigDecimal value, OutputStream outStream, Context context)
       throws IOException, CoderException {
     checkNotNull(value, String.format("cannot encode a null %s", 
BigDecimal.class.getSimpleName()));
-    VAR_INT_CODER.encode(value.scale(), outStream, context.nested());
+    VAR_INT_CODER.encode(value.scale(), outStream);
     BIG_INT_CODER.encode(value.unscaledValue(), outStream, context);
   }
 
   @Override
   public BigDecimal decode(InputStream inStream, Context context)
       throws IOException, CoderException {
-    int scale = VAR_INT_CODER.decode(inStream, context.nested());
+    int scale = VAR_INT_CODER.decode(inStream);
     BigInteger bigInteger = BIG_INT_CODER.decode(inStream, context);
     return new BigDecimal(bigInteger, scale);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
index 9994b3f..59d5424 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
@@ -97,7 +97,7 @@ public abstract class IterableLikeCoder<T, IterableT extends 
Iterable<T>>
       Collection<T> collection = (Collection<T>) iterable;
       dataOutStream.writeInt(collection.size());
       for (T elem : collection) {
-        elementCoder.encode(elem, dataOutStream, nestedContext);
+        elementCoder.encode(elem, dataOutStream);
       }
     } else {
       // We don't know the size without traversing it so use a fixed size 
buffer
@@ -108,7 +108,7 @@ public abstract class IterableLikeCoder<T, IterableT 
extends Iterable<T>>
           new BufferedElementCountingOutputStream(dataOutStream);
       for (T elem : iterable) {
         countingOutputStream.markElementStart();
-        elementCoder.encode(elem, countingOutputStream, nestedContext);
+        elementCoder.encode(elem, countingOutputStream);
       }
       countingOutputStream.finish();
     }
@@ -125,7 +125,7 @@ public abstract class IterableLikeCoder<T, IterableT 
extends Iterable<T>>
     if (size >= 0) {
       List<T> elements = new ArrayList<>(size);
       for (int i = 0; i < size; i++) {
-        elements.add(elementCoder.decode(dataInStream, nestedContext));
+        elements.add(elementCoder.decode(dataInStream));
       }
       return decodeToIterable(elements);
     }
@@ -134,7 +134,7 @@ public abstract class IterableLikeCoder<T, IterableT 
extends Iterable<T>>
     // each block of elements.
     long count = VarInt.decodeLong(dataInStream);
     while (count > 0L) {
-      elements.add(elementCoder.decode(dataInStream, nestedContext));
+      elements.add(elementCoder.decode(dataInStream));
       --count;
       if (count == 0L) {
           count = VarInt.decodeLong(dataInStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index 1df4460..0bb53ec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -63,14 +63,14 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, 
V>> {
     if (kv == null) {
       throw new CoderException("cannot encode a null KV");
     }
-    keyCoder.encode(kv.getKey(), outStream, context.nested());
+    keyCoder.encode(kv.getKey(), outStream);
     valueCoder.encode(kv.getValue(), outStream, context);
   }
 
   @Override
   public KV<K, V> decode(InputStream inStream, Context context)
       throws IOException, CoderException {
-    K key = keyCoder.decode(inStream, context.nested());
+    K key = keyCoder.decode(inStream);
     V value = valueCoder.decode(inStream, context);
     return KV.of(key, value);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index 7df9ca9..f20eb93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -89,12 +89,12 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, 
V>> {
     Iterator<Entry<K, V>> iterator = map.entrySet().iterator();
     Entry<K, V> entry = iterator.next();
     while (iterator.hasNext()) {
-      keyCoder.encode(entry.getKey(), outStream, context.nested());
-      valueCoder.encode(entry.getValue(), outStream, context.nested());
+      keyCoder.encode(entry.getKey(), outStream);
+      valueCoder.encode(entry.getValue(), outStream);
       entry = iterator.next();
     }
 
-    keyCoder.encode(entry.getKey(), outStream, context.nested());
+    keyCoder.encode(entry.getKey(), outStream);
     valueCoder.encode(entry.getValue(), outStream, context);
     // no flush needed as DataOutputStream does not buffer
   }
@@ -110,12 +110,12 @@ public class MapCoder<K, V> extends 
StructuredCoder<Map<K, V>> {
 
     Map<K, V> retval = Maps.newHashMapWithExpectedSize(size);
     for (int i = 0; i < size - 1; ++i) {
-      K key = keyCoder.decode(inStream, context.nested());
-      V value = valueCoder.decode(inStream, context.nested());
+      K key = keyCoder.decode(inStream);
+      V value = valueCoder.decode(inStream);
       retval.put(key, value);
     }
 
-    K key = keyCoder.decode(inStream, context.nested());
+    K key = keyCoder.decode(inStream);
     V value = valueCoder.decode(inStream, context);
     retval.put(key, value);
     return retval;

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 20fab9b..d8a98cd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -952,7 +952,7 @@ public abstract class FileBasedSink<T> implements 
Serializable, HasDisplayData {
       if (value == null) {
         throw new CoderException("cannot encode a null value");
       }
-      stringCoder.encode(value.getFilename().toString(), outStream, 
context.nested());
+      stringCoder.encode(value.getFilename().toString(), outStream);
       if (value.getDestinationFilename() == null) {
         stringCoder.encode(null, outStream, context);
       } else {
@@ -963,7 +963,7 @@ public abstract class FileBasedSink<T> implements 
Serializable, HasDisplayData {
     @Override
     public FileResult decode(InputStream inStream, Context context)
         throws IOException {
-      String filename = stringCoder.decode(inStream, context.nested());
+      String filename = stringCoder.decode(inStream);
       assert filename != null;  // fixes a compiler warning
       @Nullable String destinationFilename = stringCoder.decode(inStream, 
context);
       return new FileResult(

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index 37d5a55..348cc5f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -682,10 +682,10 @@ public class ApproximateQuantiles {
         QuantileState<T, ComparatorT> state, OutputStream outStream, 
Coder.Context context)
         throws CoderException, IOException {
       Coder.Context nestedContext = context.nested();
-      intCoder.encode(state.numQuantiles, outStream, nestedContext);
-      intCoder.encode(state.bufferSize, outStream, nestedContext);
-      elementCoder.encode(state.min, outStream, nestedContext);
-      elementCoder.encode(state.max, outStream, nestedContext);
+      intCoder.encode(state.numQuantiles, outStream);
+      intCoder.encode(state.bufferSize, outStream);
+      elementCoder.encode(state.min, outStream);
+      elementCoder.encode(state.max, outStream);
       elementListCoder.encode(
           state.unbufferedElements, outStream, nestedContext);
       BigEndianIntegerCoder.of().encode(
@@ -699,14 +699,14 @@ public class ApproximateQuantiles {
     public QuantileState<T, ComparatorT> decode(InputStream inStream, 
Coder.Context context)
         throws CoderException, IOException {
       Coder.Context nestedContext = context.nested();
-      int numQuantiles = intCoder.decode(inStream, nestedContext);
-      int bufferSize = intCoder.decode(inStream, nestedContext);
-      T min = elementCoder.decode(inStream, nestedContext);
-      T max = elementCoder.decode(inStream, nestedContext);
+      int numQuantiles = intCoder.decode(inStream);
+      int bufferSize = intCoder.decode(inStream);
+      T min = elementCoder.decode(inStream);
+      T max = elementCoder.decode(inStream);
       List<T> unbufferedElements =
-          elementListCoder.decode(inStream, nestedContext);
+          elementListCoder.decode(inStream);
       int numBuffers =
-          BigEndianIntegerCoder.of().decode(inStream, nestedContext);
+          BigEndianIntegerCoder.of().decode(inStream);
       List<QuantileBuffer<T>> buffers = new ArrayList<>(numBuffers);
       for (int i = 0; i < numBuffers; i++) {
         buffers.add(decodeBuffer(inStream, nestedContext));

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index 0515ed5..c45df04 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -543,7 +543,7 @@ public class CombineFns {
       int lastIndex = codersCount - 1;
       Context nestedContext = context.nested();
       for (int i = 0; i < lastIndex; ++i) {
-        coders.get(i).encode(value[i], outStream, nestedContext);
+        coders.get(i).encode(value[i], outStream);
       }
       coders.get(lastIndex).encode(value[lastIndex], outStream, context);
     }
@@ -558,7 +558,7 @@ public class CombineFns {
       int lastIndex = codersCount - 1;
       Context nestedContext = context.nested();
       for (int i = 0; i < lastIndex; ++i) {
-        ret[i] = coders.get(i).decode(inStream, nestedContext);
+        ret[i] = coders.get(i).decode(inStream);
       }
       ret[lastIndex] = coders.get(lastIndex).decode(inStream, context);
       return ret;

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index a309954..a46a21f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -187,7 +187,7 @@ public class Mean {
      @Override
      public void encode(CountSum<NumT> value, OutputStream outStream, 
Coder.Context context)
          throws CoderException, IOException {
-       LONG_CODER.encode(value.count, outStream, context.nested());
+       LONG_CODER.encode(value.count, outStream);
        DOUBLE_CODER.encode(value.sum, outStream, context);
      }
 
@@ -195,7 +195,7 @@ public class Mean {
      public CountSum<NumT> decode(InputStream inStream, Coder.Context context)
          throws CoderException, IOException {
        return new CountSum<>(
-           LONG_CODER.decode(inStream, context.nested()),
+           LONG_CODER.decode(inStream),
            DOUBLE_CODER.decode(inStream, context));
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index e9a3571..bd669ef 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -253,7 +253,7 @@ public class CoGbkResult {
       }
       int lastIndex = schema.size() - 1;
       for (int unionTag = 0; unionTag < lastIndex; unionTag++) {
-        tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, 
context.nested());
+        tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream);
       }
       tagListCoder(lastIndex).encode(value.valueMap.get(lastIndex), outStream, 
context);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index 46ece09..cb5a7cf 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -181,14 +181,14 @@ public class IntervalWindow extends BoundedWindow
     @Override
     public void encode(IntervalWindow window, OutputStream outStream, Context 
context)
         throws IOException, CoderException {
-      instantCoder.encode(window.end, outStream, context.nested());
+      instantCoder.encode(window.end, outStream);
       durationCoder.encode(new Duration(window.start, window.end), outStream, 
context);
     }
 
     @Override
     public IntervalWindow decode(InputStream inStream, Context context)
         throws IOException, CoderException {
-      Instant end = instantCoder.decode(inStream, context.nested());
+      Instant end = instantCoder.decode(inStream);
       ReadableDuration duration = durationCoder.decode(inStream, context);
       return new IntervalWindow(end.minus(duration), end);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 1b7e335..e3e61cf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -643,8 +643,8 @@ public abstract class WindowedValue<T> {
       Context nestedContext = context.nested();
       InstantCoder.of().encode(
           windowedElem.getTimestamp(), outStream, nestedContext);
-      windowsCoder.encode(windowedElem.getWindows(), outStream, nestedContext);
-      PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, 
nestedContext);
+      windowsCoder.encode(windowedElem.getWindows(), outStream);
+      PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream);
       valueCoder.encode(windowedElem.getValue(), outStream, context);
     }
 
@@ -652,10 +652,10 @@ public abstract class WindowedValue<T> {
     public WindowedValue<T> decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       Context nestedContext = context.nested();
-      Instant timestamp = InstantCoder.of().decode(inStream, nestedContext);
+      Instant timestamp = InstantCoder.of().decode(inStream);
       Collection<? extends BoundedWindow> windows =
-          windowsCoder.decode(inStream, nestedContext);
-      PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream, nestedContext);
+          windowsCoder.decode(inStream);
+      PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream);
       T value = valueCoder.decode(inStream, context);
       return WindowedValue.of(value, timestamp, windows, pane);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index c172885..89747a7 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -104,7 +104,7 @@ public class TimestampedValue<V> {
                        OutputStream outStream,
                        Context context)
         throws IOException {
-      valueCoder.encode(windowedElem.getValue(), outStream, context.nested());
+      valueCoder.encode(windowedElem.getValue(), outStream);
       InstantCoder.of().encode(
           windowedElem.getTimestamp(), outStream, context);
     }
@@ -112,7 +112,7 @@ public class TimestampedValue<V> {
     @Override
     public TimestampedValue<T> decode(InputStream inStream, Context context)
         throws IOException {
-      T value = valueCoder.decode(inStream, context.nested());
+      T value = valueCoder.decode(inStream);
       Instant timestamp = InstantCoder.of().decode(inStream, context);
       return TimestampedValue.of(value, timestamp);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
index 3ecbaa2..e8a2dfd 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
@@ -78,18 +78,18 @@ public abstract class ValueInSingleWindow<T> {
     public void encode(ValueInSingleWindow<T> windowedElem, OutputStream 
outStream, Context context)
         throws IOException {
       Context nestedContext = context.nested();
-      InstantCoder.of().encode(windowedElem.getTimestamp(), outStream, 
nestedContext);
-      windowCoder.encode(windowedElem.getWindow(), outStream, nestedContext);
-      PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), 
outStream, nestedContext);
+      InstantCoder.of().encode(windowedElem.getTimestamp(), outStream);
+      windowCoder.encode(windowedElem.getWindow(), outStream);
+      PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), 
outStream);
       valueCoder.encode(windowedElem.getValue(), outStream, context);
     }
 
     @Override
     public ValueInSingleWindow<T> decode(InputStream inStream, Context 
context) throws IOException {
       Context nestedContext = context.nested();
-      Instant timestamp = InstantCoder.of().decode(inStream, nestedContext);
-      BoundedWindow window = windowCoder.decode(inStream, nestedContext);
-      PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, 
nestedContext);
+      Instant timestamp = InstantCoder.of().decode(inStream);
+      BoundedWindow window = windowCoder.decode(inStream);
+      PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream);
       T value = valueCoder.decode(inStream, context);
       return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, 
pane);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
index 3f057e1..f06317b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
@@ -103,7 +103,7 @@ public class ValueWithRecordId<ValueT> {
     @Override
     public void encode(ValueWithRecordId<ValueT> value, OutputStream 
outStream, Context context)
         throws IOException {
-      valueCoder.encode(value.value, outStream, context.nested());
+      valueCoder.encode(value.value, outStream);
       idCoder.encode(value.id, outStream, context);
     }
 
@@ -111,7 +111,7 @@ public class ValueWithRecordId<ValueT> {
     public ValueWithRecordId<ValueT> decode(InputStream inStream, Context 
context)
         throws IOException {
       return new ValueWithRecordId<ValueT>(
-          valueCoder.decode(inStream, context.nested()),
+          valueCoder.decode(inStream),
           idCoder.decode(inStream, context));
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
index d97eea6..adb6652 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
@@ -182,15 +182,15 @@ public class SerializableCoderTest implements 
Serializable {
     // Encode both strings into NESTED form.
     byte[] nestedEncoding;
     try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
-      coder.encode(source, os, Coder.Context.NESTED);
-      coder.encode(source2, os, Coder.Context.NESTED);
+      coder.encode(source, os);
+      coder.encode(source2, os);
       nestedEncoding = os.toByteArray();
     }
 
     // Decode from NESTED form.
     try (ByteArrayInputStream is = new ByteArrayInputStream(nestedEncoding)) {
-      assertEquals(source, coder.decode(is, Coder.Context.NESTED));
-      assertEquals(source2, coder.decode(is, Coder.Context.NESTED));
+      assertEquals(source, coder.decode(is));
+      assertEquals(source2, coder.decode(is));
       assertEquals(0, is.available());
     }
   }
@@ -207,20 +207,20 @@ public class SerializableCoderTest implements 
Serializable {
     Coder<String> coder = SerializableCoder.of(String.class);
     byte[] encodedBytes;
     try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
-      coder.encode(null, os, Coder.Context.NESTED);
-      coder.encode("TestValue", os, Coder.Context.NESTED);
-      coder.encode(null, os, Coder.Context.NESTED);
-      coder.encode("TestValue2", os, Coder.Context.NESTED);
-      coder.encode(null, os, Coder.Context.NESTED);
+      coder.encode(null, os);
+      coder.encode("TestValue", os);
+      coder.encode(null, os);
+      coder.encode("TestValue2", os);
+      coder.encode(null, os);
       encodedBytes = os.toByteArray();
     }
 
     try (ByteArrayInputStream is = new ByteArrayInputStream(encodedBytes)) {
-      assertNull(coder.decode(is, Coder.Context.NESTED));
-      assertEquals("TestValue", coder.decode(is,  Coder.Context.NESTED));
-      assertNull(coder.decode(is, Coder.Context.NESTED));
-      assertEquals("TestValue2", coder.decode(is,  Coder.Context.NESTED));
-      assertNull(coder.decode(is, Coder.Context.NESTED));
+      assertNull(coder.decode(is));
+      assertEquals("TestValue", coder.decode(is));
+      assertNull(coder.decode(is));
+      assertEquals("TestValue2", coder.decode(is));
+      assertNull(coder.decode(is));
       assertEquals(0, is.available());
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index a70af94..e4b016b 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -878,14 +878,14 @@ public class CombineTest implements Serializable {
       @Override
       public void encode(CountSum value, OutputStream outStream,
           Context context) throws CoderException, IOException {
-        LONG_CODER.encode(value.count, outStream, context.nested());
+        LONG_CODER.encode(value.count, outStream);
         DOUBLE_CODER.encode(value.sum, outStream, context);
       }
 
       @Override
       public CountSum decode(InputStream inStream, Coder.Context context)
           throws CoderException, IOException {
-        long count = LONG_CODER.decode(inStream, context.nested());
+        long count = LONG_CODER.decode(inStream);
         double sum = DOUBLE_CODER.decode(inStream, context);
         return new CountSum(count, sum);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index a458812..7e8a1dd 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -210,14 +210,14 @@ public class CreateTest {
           OutputStream outStream,
           org.apache.beam.sdk.coders.Coder.Context context)
           throws CoderException, IOException {
-        stringCoder.encode(value.myString, outStream, context.nested());
+        stringCoder.encode(value.myString, outStream);
       }
 
       @Override
       public UnserializableRecord decode(
           InputStream inStream, org.apache.beam.sdk.coders.Coder.Context 
context)
           throws CoderException, IOException {
-        return new UnserializableRecord(stringCoder.decode(inStream, 
context.nested()));
+        return new UnserializableRecord(stringCoder.decode(inStream));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
index 314b969..9ae5d68 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
@@ -35,7 +35,7 @@ public class GlobalWindowTest {
     CountingOutputStream out = new 
CountingOutputStream(ByteStreams.nullOutputStream());
     GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out, 
Context.OUTER);
     assertEquals(0, out.getCount());
-    GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out, 
Context.NESTED);
+    GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out);
     assertEquals(0, out.getCount());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java
index 36f7028..894d8a9 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BufferedElementCountingOutputStreamTest.java
@@ -32,7 +32,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder.Context;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Rule;
 import org.junit.Test;
@@ -180,7 +179,7 @@ public class BufferedElementCountingOutputStreamTest {
     do {
       count = VarInt.decodeLong(is);
       for (int i = 0; i < count; ++i) {
-        values.add(ByteArrayCoder.of().decode(is, Context.NESTED));
+        values.add(ByteArrayCoder.of().decode(is));
       }
     } while(count > 0);
 
@@ -198,7 +197,7 @@ public class BufferedElementCountingOutputStreamTest {
 
     for (byte[] value : values) {
       os.markElementStart();
-      ByteArrayCoder.of().encode(value, os, Context.NESTED);
+      ByteArrayCoder.of().encode(value, os);
     }
     return os;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
index 18e0d95..37745be 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
@@ -110,7 +110,7 @@ public class BeamFnDataBufferingOutboundObserver<T>
 
   @Override
   public void accept(WindowedValue<T> t) throws IOException {
-    coder.encode(t, bufferedElements, Context.NESTED);
+    coder.encode(t, bufferedElements);
     counter += 1;
     if (bufferedElements.size() >= bufferLimit) {
       outboundObserver.onNext(convertBufferForTransmission().build());

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
index 24365d8..ece87d2 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
@@ -71,7 +71,7 @@ public class BeamFnDataInboundObserver<T> implements 
Consumer<BeamFnApi.Elements
       InputStream inputStream = t.getData().newInput();
       while (inputStream.available() > 0) {
         counter += 1;
-        WindowedValue<T> value = coder.decode(inputStream, Context.NESTED);
+        WindowedValue<T> value = coder.decode(inputStream);
         consumer.accept(value);
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
index 7cbf8eb..3f6ece7 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
@@ -135,7 +135,7 @@ public class BeamFnDataBufferingOutboundObserverTest {
   private static BeamFnApi.Elements messageWithData(byte[] ... datum) throws 
IOException {
     ByteString.Output output = ByteString.newOutput();
     for (byte[] data : datum) {
-      CODER.encode(valueInGlobalWindow(data), output, Context.NESTED);
+      CODER.encode(valueInGlobalWindow(data), output);
     }
     return BeamFnApi.Elements.newBuilder()
         .addData(BeamFnApi.Elements.Data.newBuilder()

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
index c53f99d..4b0bf0c 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
@@ -108,7 +108,7 @@ public class BeamFnDataInboundObserverTest {
             .setName("Test"));
     ByteString.Output output = ByteString.newOutput();
     for (String value : values) {
-      CODER.encode(valueInGlobalWindow(value), output, Context.NESTED);
+      CODER.encode(valueInGlobalWindow(value), output);
     }
     builder.setData(output.toByteString());
     return builder.build();

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
index 7aefcfa..c2b62b7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
@@ -53,17 +53,18 @@ class ShardedKeyCoder<KeyT>
   }
 
   @Override
-  public void encode(ShardedKey<KeyT> key, OutputStream outStream, Context 
context)
+  public void encode(ShardedKey<KeyT> key, OutputStream outStream)
       throws IOException {
-    keyCoder.encode(key.getKey(), outStream, context.nested());
-    shardNumberCoder.encode(key.getShardNumber(), outStream, context);
+    keyCoder.encode(key.getKey(), outStream);
+    shardNumberCoder.encode(key.getShardNumber(), outStream);
   }
 
   @Override
-  public ShardedKey<KeyT> decode(InputStream inStream, Context context)
+  public ShardedKey<KeyT> decode(InputStream inStream)
       throws IOException {
     return new ShardedKey<>(
-        keyCoder.decode(inStream, context.nested()), 
shardNumberCoder.decode(inStream, context));
+        keyCoder.decode(inStream),
+        shardNumberCoder.decode(inStream));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
index 01bc558..33b9f77 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -43,14 +43,14 @@ public class TableDestinationCoder extends 
AtomicCoder<TableDestination> {
     if (value == null) {
       throw new CoderException("cannot encode a null value");
     }
-    tableSpecCoder.encode(value.getTableSpec(), outStream, context.nested());
-    tableDescriptionCoder.encode(value.getTableDescription(), outStream, 
context);
+    tableSpecCoder.encode(value.getTableSpec(), outStream);
+    tableDescriptionCoder.encode(value.getTableDescription(), outStream);
   }
 
   @Override
-  public TableDestination decode(InputStream inStream, Context context) throws 
IOException {
-    String tableSpec = tableSpecCoder.decode(inStream, context.nested());
-    String tableDescription = tableDescriptionCoder.decode(inStream, context);
+  public TableDestination decode(InputStream inStream) throws IOException {
+    String tableSpec = tableSpecCoder.decode(inStream);
+    String tableDescription = tableDescriptionCoder.decode(inStream);
     return new TableDestination(tableSpec, tableDescription);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
index 2b1988a..8ae75c5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
@@ -43,7 +43,7 @@ class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
     if (value == null) {
       throw new CoderException("cannot encode a null value");
     }
-    tableRowCoder.encode(value.tableRow, outStream, context.nested());
+    tableRowCoder.encode(value.tableRow, outStream);
     idCoder.encode(value.uniqueId, outStream, context);
   }
 
@@ -51,7 +51,7 @@ class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
   public TableRowInfo decode(InputStream inStream, Context context)
       throws IOException {
     return new TableRowInfo(
-        tableRowCoder.decode(inStream, context.nested()),
+        tableRowCoder.decode(inStream),
         idCoder.decode(inStream, context));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index 890979b..9e83271 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -106,16 +106,16 @@ class WriteBundlesToFiles<DestinationT>
       if (value == null) {
         throw new CoderException("cannot encode a null value");
       }
-      stringCoder.encode(value.filename, outStream, context.nested());
-      longCoder.encode(value.fileByteSize, outStream, context.nested());
-      destinationCoder.encode(value.destination, outStream, context.nested());
+      stringCoder.encode(value.filename, outStream);
+      longCoder.encode(value.fileByteSize, outStream);
+      destinationCoder.encode(value.destination, outStream);
     }
 
     @Override
     public Result<DestinationT> decode(InputStream inStream, Context context) 
throws IOException {
-      String filename = stringCoder.decode(inStream, context.nested());
-      long fileByteSize = longCoder.decode(inStream, context.nested());
-      DestinationT destination = destinationCoder.decode(inStream, 
context.nested());
+      String filename = stringCoder.decode(inStream);
+      long fileByteSize = longCoder.decode(inStream);
+      DestinationT destination = destinationCoder.decode(inStream);
       return new Result<>(filename, fileByteSize, destination);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
index e061edc..5907c9e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
@@ -47,13 +47,13 @@ public class PubsubMessageWithAttributesCoder extends 
CustomCoder<PubsubMessage>
 
   public void encode(PubsubMessage value, OutputStream outStream, Context 
context)
       throws IOException {
-    PAYLOAD_CODER.encode(value.getPayload(), outStream, context.nested());
+    PAYLOAD_CODER.encode(value.getPayload(), outStream);
     ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream, context);
   }
 
   @Override
   public PubsubMessage decode(InputStream inStream, Context context) throws 
IOException {
-    byte[] payload = PAYLOAD_CODER.decode(inStream, context.nested());
+    byte[] payload = PAYLOAD_CODER.decode(inStream);
     Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, 
context);
     return new PubsubMessage(payload, attributes);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 9f04a6c..ae320c7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -110,19 +110,19 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
     public void encode(
         OutgoingMessage value, OutputStream outStream, Context context)
         throws CoderException, IOException {
-      ByteArrayCoder.of().encode(value.elementBytes, outStream, 
context.nested());
-      ATTRIBUTES_CODER.encode(value.attributes, outStream, context.nested());
-      BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, 
context.nested());
-      RECORD_ID_CODER.encode(value.recordId, outStream, context.nested());
+      ByteArrayCoder.of().encode(value.elementBytes, outStream);
+      ATTRIBUTES_CODER.encode(value.attributes, outStream);
+      BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream);
+      RECORD_ID_CODER.encode(value.recordId, outStream);
     }
 
     @Override
     public OutgoingMessage decode(
         InputStream inStream, Context context) throws CoderException, 
IOException {
-      byte[] elementBytes = ByteArrayCoder.of().decode(inStream, 
context.nested());
-      Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, 
context.nested());
-      long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, 
context.nested());
-      @Nullable String recordId = RECORD_ID_CODER.decode(inStream, 
context.nested());
+      byte[] elementBytes = ByteArrayCoder.of().decode(inStream);
+      Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream);
+      long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream);
+      @Nullable String recordId = RECORD_ID_CODER.decode(inStream);
       return new OutgoingMessage(elementBytes, attributes, 
timestampMsSinceEpoch, recordId);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index c16b8fb..e53976e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -380,7 +380,7 @@ public class PubsubUnboundedSource extends 
PTransform<PBegin, PCollection<Pubsub
 
     @Override
     public PubsubCheckpoint decode(InputStream inStream, Context context) 
throws IOException {
-      String path = SUBSCRIPTION_PATH_CODER.decode(inStream, context.nested());
+      String path = SUBSCRIPTION_PATH_CODER.decode(inStream);
       List<String> notYetReadIds = LIST_CODER.decode(inStream, context);
       return new PubsubCheckpoint(path, null, null, notYetReadIds);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/27e9a060/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java 
b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
index 2b4503a..5386a61 100644
--- 
a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
+++ 
b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
@@ -181,8 +181,8 @@ public class JAXBCoderTest {
     public void encode(TestType value, OutputStream outStream, Context context)
         throws CoderException, IOException {
       Context nestedContext = context.nested();
-      VarIntCoder.of().encode(3, outStream, nestedContext);
-      jaxbCoder.encode(value, outStream, nestedContext);
+      VarIntCoder.of().encode(3, outStream);
+      jaxbCoder.encode(value, outStream);
       VarLongCoder.of().encode(22L, outStream, context);
     }
 
@@ -190,8 +190,8 @@ public class JAXBCoderTest {
     public TestType decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       Context nestedContext = context.nested();
-      VarIntCoder.of().decode(inStream, nestedContext);
-      TestType result = jaxbCoder.decode(inStream, nestedContext);
+      VarIntCoder.of().decode(inStream);
+      TestType result = jaxbCoder.decode(inStream);
       VarLongCoder.of().decode(inStream, context);
       return result;
     }

Reply via email to