Move InMemoryStateInternals to runners/core-java

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/af391b88
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/af391b88
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/af391b88

Branch: refs/heads/master
Commit: af391b88cfc38659f594799bb58b6090a7bcd3a4
Parents: 2b6698d
Author: Kenneth Knowles <k...@google.com>
Authored: Thu Jan 26 21:03:42 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Mon Feb 6 09:26:06 2017 -0800

----------------------------------------------------------------------
 .../runners/core/InMemoryStateInternals.java    | 442 +++++++++++++++++++
 .../core/TestInMemoryStateInternals.java        |  65 +++
 .../core/GroupAlsoByWindowsProperties.java      |   1 -
 .../core/InMemoryStateInternalsTest.java        | 359 +++++++++++++++
 .../core/MergingActiveWindowSetTest.java        |   1 -
 .../beam/runners/core/ReduceFnTester.java       |   1 -
 .../beam/runners/core/SideInputHandlerTest.java |   1 -
 .../beam/runners/core/SplittableParDoTest.java  |   1 -
 .../triggers/TriggerStateMachineTester.java     |   2 +-
 .../CopyOnAccessInMemoryStateInternals.java     |  12 +-
 .../spark/translation/TranslationUtils.java     |   3 +-
 .../sdk/util/state/InMemoryStateInternals.java  | 430 ------------------
 .../util/state/TestInMemoryStateInternals.java  |  61 ---
 .../util/state/InMemoryStateInternalsTest.java  | 348 ---------------
 14 files changed, 874 insertions(+), 853 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
new file mode 100644
index 0000000..059e32d
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.CombineFnUtil;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateContext;
+import org.apache.beam.sdk.util.state.StateContexts;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateTable;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTag.StateBinder;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.joda.time.Instant;
+
+/**
+ * In-memory implementation of {@link StateInternals}. Used in {@code 
BatchModeExecutionContext}
+ * and for running tests that need state.
+ */
+@Experimental(Kind.STATE)
+public class InMemoryStateInternals<K> implements StateInternals<K> {
+
+  public static <K> InMemoryStateInternals<K> forKey(K key) {
+    return new InMemoryStateInternals<>(key);
+  }
+
+  private final K key;
+
+  protected InMemoryStateInternals(K key) {
+    this.key = key;
+  }
+
+  @Override
+  public K getKey() {
+    return key;
+  }
+
+  /**
+   * Interface common to all in-memory state cells. Includes ability to see 
whether a cell has been
+   * cleared and the ability to create a clone of the contents.
+   */
+  public interface InMemoryState<T extends InMemoryState<T>> {
+    boolean isCleared();
+    T copy();
+  }
+
+  protected final StateTable<K> inMemoryState = new StateTable<K>() {
+    @Override
+    protected StateBinder<K> binderForNamespace(StateNamespace namespace, 
StateContext<?> c) {
+      return new InMemoryStateBinder<K>(key, c);
+    }
+  };
+
+  public void clear() {
+    inMemoryState.clear();
+  }
+
+  /**
+   * Return true if the given state is empty. This is used by the test 
framework to make sure
+   * that the state has been properly cleaned up.
+   */
+  protected boolean isEmptyForTesting(State state) {
+    return ((InMemoryState<?>) state).isCleared();
+  }
+
+  @Override
+  public <T extends State> T state(StateNamespace namespace, StateTag<? super 
K, T> address) {
+    return inMemoryState.get(namespace, address, StateContexts.nullContext());
+  }
+
+  @Override
+  public <T extends State> T state(
+      StateNamespace namespace, StateTag<? super K, T> address, final 
StateContext<?> c) {
+    return inMemoryState.get(namespace, address, c);
+  }
+
+  /**
+   * A {@link StateBinder} that returns In Memory {@link State} objects.
+   */
+  public static class InMemoryStateBinder<K> implements StateBinder<K> {
+    private final K key;
+    private final StateContext<?> c;
+
+    public InMemoryStateBinder(K key, StateContext<?> c) {
+      this.key = key;
+      this.c = c;
+    }
+
+    @Override
+    public <T> ValueState<T> bindValue(
+        StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+      return new InMemoryValue<T>();
+    }
+
+    @Override
+    public <T> BagState<T> bindBag(
+        final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+      return new InMemoryBag<T>();
+    }
+
+    @Override
+    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
+        bindCombiningValue(
+            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+            Coder<AccumT> accumCoder,
+            final CombineFn<InputT, AccumT, OutputT> combineFn) {
+      return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, 
combineFn.<K>asKeyedFn());
+    }
+
+    @Override
+    public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+        StateTag<? super K, WatermarkHoldState<W>> address,
+        OutputTimeFn<? super W> outputTimeFn) {
+      return new InMemoryWatermarkHold<W>(outputTimeFn);
+    }
+
+    @Override
+    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
+        bindKeyedCombiningValue(
+            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+            Coder<AccumT> accumCoder,
+            KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+      return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, 
combineFn);
+    }
+
+    @Override
+    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
+        bindKeyedCombiningValueWithContext(
+            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+            Coder<AccumT> accumCoder,
+            KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> 
combineFn) {
+      return bindKeyedCombiningValue(address, accumCoder, 
CombineFnUtil.bindContext(combineFn, c));
+    }
+  }
+
+  /**
+   * An {@link InMemoryState} implementation of {@link ValueState}.
+   */
+  public static final class InMemoryValue<T>
+      implements ValueState<T>, InMemoryState<InMemoryValue<T>> {
+    private boolean isCleared = true;
+    private T value = null;
+
+    @Override
+    public void clear() {
+      // Even though we're clearing we can't remove this from the in-memory 
state map, since
+      // other users may already have a handle on this Value.
+      value = null;
+      isCleared = true;
+    }
+
+    @Override
+    public InMemoryValue<T> readLater() {
+      return this;
+    }
+
+    @Override
+    public T read() {
+      return value;
+    }
+
+    @Override
+    public void write(T input) {
+      isCleared = false;
+      this.value = input;
+    }
+
+    @Override
+    public InMemoryValue<T> copy() {
+      InMemoryValue<T> that = new InMemoryValue<>();
+      if (!this.isCleared) {
+        that.isCleared = this.isCleared;
+        that.value = this.value;
+      }
+      return that;
+    }
+
+    @Override
+    public boolean isCleared() {
+      return isCleared;
+    }
+  }
+
+  /**
+   * An {@link InMemoryState} implementation of {@link WatermarkHoldState}.
+   */
+  public static final class InMemoryWatermarkHold<W extends BoundedWindow>
+      implements WatermarkHoldState<W>, 
InMemoryState<InMemoryWatermarkHold<W>> {
+
+    private final OutputTimeFn<? super W> outputTimeFn;
+
+    @Nullable
+    private Instant combinedHold = null;
+
+    public InMemoryWatermarkHold(OutputTimeFn<? super W> outputTimeFn) {
+      this.outputTimeFn = outputTimeFn;
+    }
+
+    @Override
+    public InMemoryWatermarkHold<W> readLater() {
+      return this;
+    }
+
+    @Override
+    public void clear() {
+      // Even though we're clearing we can't remove this from the in-memory 
state map, since
+      // other users may already have a handle on this WatermarkBagInternal.
+      combinedHold = null;
+    }
+
+    @Override
+    public Instant read() {
+      return combinedHold;
+    }
+
+    @Override
+    public void add(Instant outputTime) {
+      combinedHold = combinedHold == null ? outputTime
+          : outputTimeFn.combine(combinedHold, outputTime);
+    }
+
+    @Override
+    public boolean isCleared() {
+      return combinedHold == null;
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+        @Override
+        public Boolean read() {
+          return combinedHold == null;
+        }
+      };
+    }
+
+    @Override
+    public OutputTimeFn<? super W> getOutputTimeFn() {
+      return outputTimeFn;
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toString(combinedHold);
+    }
+
+    @Override
+    public InMemoryWatermarkHold<W> copy() {
+      InMemoryWatermarkHold<W> that =
+          new InMemoryWatermarkHold<>(outputTimeFn);
+      that.combinedHold = this.combinedHold;
+      return that;
+    }
+  }
+
+  /**
+   * An {@link InMemoryState} implementation of {@link 
AccumulatorCombiningState}.
+   */
+  public static final class InMemoryCombiningValue<K, InputT, AccumT, OutputT>
+      implements AccumulatorCombiningState<InputT, AccumT, OutputT>,
+          InMemoryState<InMemoryCombiningValue<K, InputT, AccumT, OutputT>> {
+    private final K key;
+    private boolean isCleared = true;
+    private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
+    private AccumT accum;
+
+    public InMemoryCombiningValue(
+        K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+      this.key = key;
+      this.combineFn = combineFn;
+      accum = combineFn.createAccumulator(key);
+    }
+
+    @Override
+    public InMemoryCombiningValue<K, InputT, AccumT, OutputT> readLater() {
+      return this;
+    }
+
+    @Override
+    public void clear() {
+      // Even though we're clearing we can't remove this from the in-memory 
state map, since
+      // other users may already have a handle on this CombiningValue.
+      accum = combineFn.createAccumulator(key);
+      isCleared = true;
+    }
+
+    @Override
+    public OutputT read() {
+      return combineFn.extractOutput(key, accum);
+    }
+
+    @Override
+    public void add(InputT input) {
+      isCleared = false;
+      accum = combineFn.addInput(key, accum, input);
+    }
+
+    @Override
+    public AccumT getAccum() {
+      return accum;
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+        @Override
+        public Boolean read() {
+          return isCleared;
+        }
+      };
+    }
+
+    @Override
+    public void addAccum(AccumT accum) {
+      isCleared = false;
+      this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, 
accum));
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return combineFn.mergeAccumulators(key, accumulators);
+    }
+
+    @Override
+    public boolean isCleared() {
+      return isCleared;
+    }
+
+    @Override
+    public InMemoryCombiningValue<K, InputT, AccumT, OutputT> copy() {
+      InMemoryCombiningValue<K, InputT, AccumT, OutputT> that =
+          new InMemoryCombiningValue<>(key, combineFn);
+      if (!this.isCleared) {
+        that.isCleared = this.isCleared;
+        that.addAccum(accum);
+      }
+      return that;
+    }
+  }
+
+  /**
+   * An {@link InMemoryState} implementation of {@link BagState}.
+   */
+  public static final class InMemoryBag<T> implements BagState<T>, 
InMemoryState<InMemoryBag<T>> {
+    private List<T> contents = new ArrayList<>();
+
+    @Override
+    public void clear() {
+      // Even though we're clearing we can't remove this from the in-memory 
state map, since
+      // other users may already have a handle on this Bag.
+      // The result of get/read below must be stable for the lifetime of the 
bundle within which it
+      // was generated. In batch and direct runners the bundle lifetime can be
+      // greater than the window lifetime, in which case this method can be 
called while
+      // the result is still in use. We protect against this by hot-swapping 
instead of
+      // clearing the contents.
+      contents = new ArrayList<>();
+    }
+
+    @Override
+    public InMemoryBag<T> readLater() {
+      return this;
+    }
+
+    @Override
+    public Iterable<T> read() {
+      return contents;
+    }
+
+    @Override
+    public void add(T input) {
+      contents.add(input);
+    }
+
+    @Override
+    public boolean isCleared() {
+      return contents.isEmpty();
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+
+        @Override
+        public Boolean read() {
+          return contents.isEmpty();
+        }
+      };
+    }
+
+    @Override
+    public InMemoryBag<T> copy() {
+      InMemoryBag<T> that = new InMemoryBag<>();
+      that.contents.addAll(this.contents);
+      return that;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
new file mode 100644
index 0000000..60754e2
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.joda.time.Instant;
+
+/**
+ * Simulates state like {@link InMemoryStateInternals} and provides some extra 
helper methods.
+ */
+public class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> {
+  public TestInMemoryStateInternals(K key) {
+    super(key);
+  }
+
+  public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) {
+    Set<StateTag<? super K, ?>> inUse = new HashSet<>();
+    for (Map.Entry<StateTag<? super K, ?>, State> entry :
+      inMemoryState.getTagsInUse(namespace).entrySet()) {
+      if (!isEmptyForTesting(entry.getValue())) {
+        inUse.add(entry.getKey());
+      }
+    }
+    return inUse;
+  }
+
+  public Set<StateNamespace> getNamespacesInUse() {
+    return inMemoryState.getNamespacesInUse();
+  }
+
+  /** Return the earliest output watermark hold in state, or null if none. */
+  public Instant earliestWatermarkHold() {
+    Instant minimum = null;
+    for (State storage : inMemoryState.values()) {
+      if (storage instanceof WatermarkHoldState) {
+        Instant hold = ((WatermarkHoldState<?>) storage).read();
+        if (minimum == null || (hold != null && hold.isBefore(minimum))) {
+          minimum = hold;
+        }
+      }
+    }
+    return minimum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index ef01106..fedc4ca 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -51,7 +51,6 @@ import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
new file mode 100644
index 0000000..c75399c
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaceForTest;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link InMemoryStateInternals}.
+ */
+@RunWith(JUnit4.class)
+public class InMemoryStateInternalsTest {
+  private static final BoundedWindow WINDOW_1 = new IntervalWindow(new 
Instant(0), new Instant(10));
+  private static final StateNamespace NAMESPACE_1 = new 
StateNamespaceForTest("ns1");
+  private static final StateNamespace NAMESPACE_2 = new 
StateNamespaceForTest("ns2");
+  private static final StateNamespace NAMESPACE_3 = new 
StateNamespaceForTest("ns3");
+
+  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+      StateTags.value("stringValue", StringUtf8Coder.of());
+  private static final StateTag<Object, AccumulatorCombiningState<Integer, 
int[], Integer>>
+      SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
+          "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
+  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+      StateTags.bag("stringBag", StringUtf8Coder.of());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+      WATERMARK_EARLIEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", 
OutputTimeFns.outputAtEarliestInputTimestamp());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+      WATERMARK_LATEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", 
OutputTimeFns.outputAtLatestInputTimestamp());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> 
WATERMARK_EOW_ADDR =
+      StateTags.watermarkStateInternal("watermark", 
OutputTimeFns.outputAtEndOfWindow());
+
+  InMemoryStateInternals<String> underTest = 
InMemoryStateInternals.forKey("dummyKey");
+
+  @Test
+  public void testValue() throws Exception {
+    ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), 
Matchers.sameInstance(value));
+    assertThat(
+        underTest.state(NAMESPACE_2, STRING_VALUE_ADDR),
+        Matchers.not(Matchers.sameInstance(value)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.write("hello");
+    assertThat(value.read(), Matchers.equalTo("hello"));
+    value.write("world");
+    assertThat(value.read(), Matchers.equalTo("world"));
+
+    value.clear();
+    assertThat(value.read(), Matchers.nullValue());
+    assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), 
Matchers.sameInstance(value));
+  }
+
+  @Test
+  public void testBag() throws Exception {
+    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
+
+    assertThat(value.read(), Matchers.emptyIterable());
+    value.add("hello");
+    assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
+
+    value.add("world");
+    assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
+
+    value.clear();
+    assertThat(value.read(), Matchers.emptyIterable());
+    assertThat(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), 
Matchers.sameInstance(value));
+  }
+
+  @Test
+  public void testBagIsEmpty() throws Exception {
+    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add("hello");
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeBagIntoSource() throws Exception {
+    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+
+    bag1.add("Hello");
+    bag2.add("World");
+    bag1.add("!");
+
+    StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
+
+    // Reading the merged bag gets both the contents
+    assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", 
"!"));
+    assertThat(bag2.read(), Matchers.emptyIterable());
+  }
+
+  @Test
+  public void testMergeBagIntoNewNamespace() throws Exception {
+    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+    BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
+
+    bag1.add("Hello");
+    bag2.add("World");
+    bag1.add("!");
+
+    StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
+
+    // Reading the merged bag gets both the contents
+    assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", 
"!"));
+    assertThat(bag1.read(), Matchers.emptyIterable());
+    assertThat(bag2.read(), Matchers.emptyIterable());
+  }
+
+  @Test
+  public void testCombiningValue() throws Exception {
+    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, 
SUM_INTEGER_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
+
+    assertThat(value.read(), Matchers.equalTo(0));
+    value.add(2);
+    assertThat(value.read(), Matchers.equalTo(2));
+
+    value.add(3);
+    assertThat(value.read(), Matchers.equalTo(5));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(0));
+    assertThat(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), 
Matchers.sameInstance(value));
+  }
+
+  @Test
+  public void testCombiningIsEmpty() throws Exception {
+    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, 
SUM_INTEGER_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add(5);
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeCombiningValueIntoSource() throws Exception {
+    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+
+    value1.add(5);
+    value2.add(10);
+    value1.add(6);
+
+    assertThat(value1.read(), Matchers.equalTo(11));
+    assertThat(value2.read(), Matchers.equalTo(10));
+
+    // Merging clears the old values and updates the result value.
+    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
+
+    assertThat(value1.read(), Matchers.equalTo(21));
+    assertThat(value2.read(), Matchers.equalTo(0));
+  }
+
+  @Test
+  public void testMergeCombiningValueIntoNewNamespace() throws Exception {
+    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+    AccumulatorCombiningState<Integer, int[], Integer> value3 =
+        underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
+
+    value1.add(5);
+    value2.add(10);
+    value1.add(6);
+
+    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
+
+    // Merging clears the old values and updates the result value.
+    assertThat(value1.read(), Matchers.equalTo(0));
+    assertThat(value2.read(), Matchers.equalTo(0));
+    assertThat(value3.read(), Matchers.equalTo(21));
+  }
+
+  @Test
+  public void testWatermarkEarliestState() throws Exception {
+    WatermarkHoldState<BoundedWindow> value =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, 
WATERMARK_EARLIEST_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(3000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(1000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertThat(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), 
Matchers.sameInstance(value));
+  }
+
+  @Test
+  public void testWatermarkLatestState() throws Exception {
+    WatermarkHoldState<BoundedWindow> value =
+        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, 
WATERMARK_LATEST_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(3000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+
+    value.add(new Instant(1000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertThat(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), 
Matchers.sameInstance(value));
+  }
+
+  @Test
+  public void testWatermarkEndOfWindowState() throws Exception {
+    WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, 
WATERMARK_EOW_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, 
WATERMARK_EOW_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertThat(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), 
Matchers.sameInstance(value));
+  }
+
+  @Test
+  public void testWatermarkStateIsEmpty() throws Exception {
+    WatermarkHoldState<BoundedWindow> value =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add(new Instant(1000));
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeEarliestWatermarkIntoSource() throws Exception {
+    WatermarkHoldState<BoundedWindow> value1 =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+    WatermarkHoldState<BoundedWindow> value2 =
+        underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
+
+    value1.add(new Instant(3000));
+    value2.add(new Instant(5000));
+    value1.add(new Instant(4000));
+    value2.add(new Instant(2000));
+
+    // Merging clears the old values and updates the merged value.
+    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, 
WINDOW_1);
+
+    assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
+    assertThat(value2.read(), Matchers.equalTo(null));
+  }
+
+  @Test
+  public void testMergeLatestWatermarkIntoSource() throws Exception {
+    WatermarkHoldState<BoundedWindow> value1 =
+        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+    WatermarkHoldState<BoundedWindow> value2 =
+        underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
+    WatermarkHoldState<BoundedWindow> value3 =
+        underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
+
+    value1.add(new Instant(3000));
+    value2.add(new Instant(5000));
+    value1.add(new Instant(4000));
+    value2.add(new Instant(2000));
+
+    // Merging clears the old values and updates the result value.
+    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, 
WINDOW_1);
+
+    // Merging clears the old values and updates the result value.
+    assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
+    assertThat(value1.read(), Matchers.equalTo(null));
+    assertThat(value2.read(), Matchers.equalTo(null));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
index a4928e3..6cccdae 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.joda.time.Duration;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 226f5f0..d396a08 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -76,7 +76,6 @@ import 
org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
index 0bf5e90..3a5d346 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
@@ -30,7 +30,6 @@ import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Duration;

http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index ad0b01d..4bddd51 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -54,7 +54,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.util.state.TimerInternalsFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
index 2a626d4..c00cc48 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
@@ -37,6 +37,7 @@ import 
org.apache.beam.runners.core.ActiveWindowSet.MergeCallback;
 import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.MergingActiveWindowSet;
 import org.apache.beam.runners.core.NonMergingActiveWindowSet;
+import org.apache.beam.runners.core.TestInMemoryStateInternals;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -52,7 +53,6 @@ import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import 
org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
-import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index e486a75..1e12e16 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -25,6 +25,12 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryBag;
+import 
org.apache.beam.runners.core.InMemoryStateInternals.InMemoryCombiningValue;
+import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryState;
+import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryStateBinder;
+import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryValue;
+import 
org.apache.beam.runners.core.InMemoryStateInternals.InMemoryWatermarkHold;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
@@ -34,12 +40,6 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryBag;
-import 
org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryCombiningValue;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryState;
-import 
org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryStateBinder;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryValue;
-import 
org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryWatermarkHold;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateContext;
 import org.apache.beam.sdk.util.state.StateContexts;

http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index 965330c..8a9317e 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.InMemoryStateInternals;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -35,7 +36,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
@@ -47,7 +47,6 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
-
 import scala.Tuple2;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java
deleted file mode 100644
index 6611837..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java
+++ /dev/null
@@ -1,430 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.state.StateTag.StateBinder;
-import org.joda.time.Instant;
-
-/**
- * In-memory implementation of {@link StateInternals}. Used in {@code 
BatchModeExecutionContext}
- * and for running tests that need state.
- */
-@Experimental(Kind.STATE)
-public class InMemoryStateInternals<K> implements StateInternals<K> {
-
-  public static <K> InMemoryStateInternals<K> forKey(K key) {
-    return new InMemoryStateInternals<>(key);
-  }
-
-  private final K key;
-
-  protected InMemoryStateInternals(K key) {
-    this.key = key;
-  }
-
-  @Override
-  public K getKey() {
-    return key;
-  }
-
-  /**
-   * Interface common to all in-memory state cells. Includes ability to see 
whether a cell has been
-   * cleared and the ability to create a clone of the contents.
-   */
-  public interface InMemoryState<T extends InMemoryState<T>> {
-    boolean isCleared();
-    T copy();
-  }
-
-  protected final StateTable<K> inMemoryState = new StateTable<K>() {
-    @Override
-    protected StateBinder<K> binderForNamespace(StateNamespace namespace, 
StateContext<?> c) {
-      return new InMemoryStateBinder<K>(key, c);
-    }
-  };
-
-  public void clear() {
-    inMemoryState.clear();
-  }
-
-  /**
-   * Return true if the given state is empty. This is used by the test 
framework to make sure
-   * that the state has been properly cleaned up.
-   */
-  protected boolean isEmptyForTesting(State state) {
-    return ((InMemoryState<?>) state).isCleared();
-  }
-
-  @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super 
K, T> address) {
-    return inMemoryState.get(namespace, address, StateContexts.nullContext());
-  }
-
-  @Override
-  public <T extends State> T state(
-      StateNamespace namespace, StateTag<? super K, T> address, final 
StateContext<?> c) {
-    return inMemoryState.get(namespace, address, c);
-  }
-
-  /**
-   * A {@link StateBinder} that returns In Memory {@link State} objects.
-   */
-  public static class InMemoryStateBinder<K> implements StateBinder<K> {
-    private final K key;
-    private final StateContext<?> c;
-
-    public InMemoryStateBinder(K key, StateContext<?> c) {
-      this.key = key;
-      this.c = c;
-    }
-
-    @Override
-    public <T> ValueState<T> bindValue(
-        StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
-      return new InMemoryValue<T>();
-    }
-
-    @Override
-    public <T> BagState<T> bindBag(
-        final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
-      return new InMemoryBag<T>();
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
-        bindCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
-            Coder<AccumT> accumCoder,
-            final CombineFn<InputT, AccumT, OutputT> combineFn) {
-      return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, 
combineFn.<K>asKeyedFn());
-    }
-
-    @Override
-    public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-        StateTag<? super K, WatermarkHoldState<W>> address,
-        OutputTimeFn<? super W> outputTimeFn) {
-      return new InMemoryWatermarkHold<W>(outputTimeFn);
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
-        bindKeyedCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
-            Coder<AccumT> accumCoder,
-            KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-      return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, 
combineFn);
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
-        bindKeyedCombiningValueWithContext(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
-            Coder<AccumT> accumCoder,
-            KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> 
combineFn) {
-      return bindKeyedCombiningValue(address, accumCoder, 
CombineFnUtil.bindContext(combineFn, c));
-    }
-  }
-
-  /**
-   * An {@link InMemoryState} implementation of {@link ValueState}.
-   */
-  public static final class InMemoryValue<T>
-      implements ValueState<T>, InMemoryState<InMemoryValue<T>> {
-    private boolean isCleared = true;
-    private T value = null;
-
-    @Override
-    public void clear() {
-      // Even though we're clearing we can't remove this from the in-memory 
state map, since
-      // other users may already have a handle on this Value.
-      value = null;
-      isCleared = true;
-    }
-
-    @Override
-    public InMemoryValue<T> readLater() {
-      return this;
-    }
-
-    @Override
-    public T read() {
-      return value;
-    }
-
-    @Override
-    public void write(T input) {
-      isCleared = false;
-      this.value = input;
-    }
-
-    @Override
-    public InMemoryValue<T> copy() {
-      InMemoryValue<T> that = new InMemoryValue<>();
-      if (!this.isCleared) {
-        that.isCleared = this.isCleared;
-        that.value = this.value;
-      }
-      return that;
-    }
-
-    @Override
-    public boolean isCleared() {
-      return isCleared;
-    }
-  }
-
-  /**
-   * An {@link InMemoryState} implementation of {@link WatermarkHoldState}.
-   */
-  public static final class InMemoryWatermarkHold<W extends BoundedWindow>
-      implements WatermarkHoldState<W>, 
InMemoryState<InMemoryWatermarkHold<W>> {
-
-    private final OutputTimeFn<? super W> outputTimeFn;
-
-    @Nullable
-    private Instant combinedHold = null;
-
-    public InMemoryWatermarkHold(OutputTimeFn<? super W> outputTimeFn) {
-      this.outputTimeFn = outputTimeFn;
-    }
-
-    @Override
-    public InMemoryWatermarkHold<W> readLater() {
-      return this;
-    }
-
-    @Override
-    public void clear() {
-      // Even though we're clearing we can't remove this from the in-memory 
state map, since
-      // other users may already have a handle on this WatermarkBagInternal.
-      combinedHold = null;
-    }
-
-    @Override
-    public Instant read() {
-      return combinedHold;
-    }
-
-    @Override
-    public void add(Instant outputTime) {
-      combinedHold = combinedHold == null ? outputTime
-          : outputTimeFn.combine(combinedHold, outputTime);
-    }
-
-    @Override
-    public boolean isCleared() {
-      return combinedHold == null;
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-        @Override
-        public Boolean read() {
-          return combinedHold == null;
-        }
-      };
-    }
-
-    @Override
-    public OutputTimeFn<? super W> getOutputTimeFn() {
-      return outputTimeFn;
-    }
-
-    @Override
-    public String toString() {
-      return Objects.toString(combinedHold);
-    }
-
-    @Override
-    public InMemoryWatermarkHold<W> copy() {
-      InMemoryWatermarkHold<W> that =
-          new InMemoryWatermarkHold<>(outputTimeFn);
-      that.combinedHold = this.combinedHold;
-      return that;
-    }
-  }
-
-  /**
-   * An {@link InMemoryState} implementation of {@link 
AccumulatorCombiningState}.
-   */
-  public static final class InMemoryCombiningValue<K, InputT, AccumT, OutputT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT>,
-          InMemoryState<InMemoryCombiningValue<K, InputT, AccumT, OutputT>> {
-    private final K key;
-    private boolean isCleared = true;
-    private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
-    private AccumT accum;
-
-    public InMemoryCombiningValue(
-        K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-      this.key = key;
-      this.combineFn = combineFn;
-      accum = combineFn.createAccumulator(key);
-    }
-
-    @Override
-    public InMemoryCombiningValue<K, InputT, AccumT, OutputT> readLater() {
-      return this;
-    }
-
-    @Override
-    public void clear() {
-      // Even though we're clearing we can't remove this from the in-memory 
state map, since
-      // other users may already have a handle on this CombiningValue.
-      accum = combineFn.createAccumulator(key);
-      isCleared = true;
-    }
-
-    @Override
-    public OutputT read() {
-      return combineFn.extractOutput(key, accum);
-    }
-
-    @Override
-    public void add(InputT input) {
-      isCleared = false;
-      accum = combineFn.addInput(key, accum, input);
-    }
-
-    @Override
-    public AccumT getAccum() {
-      return accum;
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-        @Override
-        public Boolean read() {
-          return isCleared;
-        }
-      };
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      isCleared = false;
-      this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, 
accum));
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(key, accumulators);
-    }
-
-    @Override
-    public boolean isCleared() {
-      return isCleared;
-    }
-
-    @Override
-    public InMemoryCombiningValue<K, InputT, AccumT, OutputT> copy() {
-      InMemoryCombiningValue<K, InputT, AccumT, OutputT> that =
-          new InMemoryCombiningValue<>(key, combineFn);
-      if (!this.isCleared) {
-        that.isCleared = this.isCleared;
-        that.addAccum(accum);
-      }
-      return that;
-    }
-  }
-
-  /**
-   * An {@link InMemoryState} implementation of {@link BagState}.
-   */
-  public static final class InMemoryBag<T> implements BagState<T>, 
InMemoryState<InMemoryBag<T>> {
-    private List<T> contents = new ArrayList<>();
-
-    @Override
-    public void clear() {
-      // Even though we're clearing we can't remove this from the in-memory 
state map, since
-      // other users may already have a handle on this Bag.
-      // The result of get/read below must be stable for the lifetime of the 
bundle within which it
-      // was generated. In batch and direct runners the bundle lifetime can be
-      // greater than the window lifetime, in which case this method can be 
called while
-      // the result is still in use. We protect against this by hot-swapping 
instead of
-      // clearing the contents.
-      contents = new ArrayList<>();
-    }
-
-    @Override
-    public InMemoryBag<T> readLater() {
-      return this;
-    }
-
-    @Override
-    public Iterable<T> read() {
-      return contents;
-    }
-
-    @Override
-    public void add(T input) {
-      contents.add(input);
-    }
-
-    @Override
-    public boolean isCleared() {
-      return contents.isEmpty();
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-
-        @Override
-        public Boolean read() {
-          return contents.isEmpty();
-        }
-      };
-    }
-
-    @Override
-    public InMemoryBag<T> copy() {
-      InMemoryBag<T> that = new InMemoryBag<>();
-      that.contents.addAll(this.contents);
-      return that;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java
deleted file mode 100644
index 7b6ee68..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.joda.time.Instant;
-
-/**
- * Simulates state like {@link InMemoryStateInternals} and provides some extra 
helper methods.
- */
-public class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> {
-  public TestInMemoryStateInternals(K key) {
-    super(key);
-  }
-
-  public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) {
-    Set<StateTag<? super K, ?>> inUse = new HashSet<>();
-    for (Map.Entry<StateTag<? super K, ?>, State> entry :
-      inMemoryState.getTagsInUse(namespace).entrySet()) {
-      if (!isEmptyForTesting(entry.getValue())) {
-        inUse.add(entry.getKey());
-      }
-    }
-    return inUse;
-  }
-
-  public Set<StateNamespace> getNamespacesInUse() {
-    return inMemoryState.getNamespacesInUse();
-  }
-
-  /** Return the earliest output watermark hold in state, or null if none. */
-  public Instant earliestWatermarkHold() {
-    Instant minimum = null;
-    for (State storage : inMemoryState.values()) {
-      if (storage instanceof WatermarkHoldState) {
-        Instant hold = ((WatermarkHoldState<?>) storage).read();
-        if (minimum == null || (hold != null && hold.isBefore(minimum))) {
-          minimum = hold;
-        }
-      }
-    }
-    return minimum;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java
deleted file mode 100644
index e43ad36..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-
-import java.util.Arrays;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link InMemoryStateInternals}.
- */
-@RunWith(JUnit4.class)
-public class InMemoryStateInternalsTest {
-  private static final BoundedWindow WINDOW_1 = new IntervalWindow(new 
Instant(0), new Instant(10));
-  private static final StateNamespace NAMESPACE_1 = new 
StateNamespaceForTest("ns1");
-  private static final StateNamespace NAMESPACE_2 = new 
StateNamespaceForTest("ns2");
-  private static final StateNamespace NAMESPACE_3 = new 
StateNamespaceForTest("ns3");
-
-  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
-      StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, AccumulatorCombiningState<Integer, 
int[], Integer>>
-      SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
-          "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
-      StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
-      WATERMARK_EARLIEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", 
OutputTimeFns.outputAtEarliestInputTimestamp());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
-      WATERMARK_LATEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", 
OutputTimeFns.outputAtLatestInputTimestamp());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> 
WATERMARK_EOW_ADDR =
-      StateTags.watermarkStateInternal("watermark", 
OutputTimeFns.outputAtEndOfWindow());
-
-  InMemoryStateInternals<String> underTest = 
InMemoryStateInternals.forKey("dummyKey");
-
-  @Test
-  public void testValue() throws Exception {
-    ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), 
Matchers.sameInstance(value));
-    assertThat(
-        underTest.state(NAMESPACE_2, STRING_VALUE_ADDR),
-        Matchers.not(Matchers.sameInstance(value)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.write("hello");
-    assertThat(value.read(), Matchers.equalTo("hello"));
-    value.write("world");
-    assertThat(value.read(), Matchers.equalTo("world"));
-
-    value.clear();
-    assertThat(value.read(), Matchers.nullValue());
-    assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), 
Matchers.sameInstance(value));
-  }
-
-  @Test
-  public void testBag() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
-
-    assertThat(value.read(), Matchers.emptyIterable());
-    value.add("hello");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
-
-    value.add("world");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
-
-    value.clear();
-    assertThat(value.read(), Matchers.emptyIterable());
-    assertThat(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), 
Matchers.sameInstance(value));
-  }
-
-  @Test
-  public void testBagIsEmpty() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add("hello");
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeBagIntoSource() throws Exception {
-    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-
-    bag1.add("Hello");
-    bag2.add("World");
-    bag1.add("!");
-
-    StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
-
-    // Reading the merged bag gets both the contents
-    assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", 
"!"));
-    assertThat(bag2.read(), Matchers.emptyIterable());
-  }
-
-  @Test
-  public void testMergeBagIntoNewNamespace() throws Exception {
-    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-    BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
-
-    bag1.add("Hello");
-    bag2.add("World");
-    bag1.add("!");
-
-    StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
-
-    // Reading the merged bag gets both the contents
-    assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", 
"!"));
-    assertThat(bag1.read(), Matchers.emptyIterable());
-    assertThat(bag2.read(), Matchers.emptyIterable());
-  }
-
-  @Test
-  public void testCombiningValue() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, 
SUM_INTEGER_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
-
-    assertThat(value.read(), Matchers.equalTo(0));
-    value.add(2);
-    assertThat(value.read(), Matchers.equalTo(2));
-
-    value.add(3);
-    assertThat(value.read(), Matchers.equalTo(5));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(0));
-    assertThat(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), 
Matchers.sameInstance(value));
-  }
-
-  @Test
-  public void testCombiningIsEmpty() throws Exception {
-    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, 
SUM_INTEGER_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add(5);
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeCombiningValueIntoSource() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
-        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
-        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-
-    value1.add(5);
-    value2.add(10);
-    value1.add(6);
-
-    assertThat(value1.read(), Matchers.equalTo(11));
-    assertThat(value2.read(), Matchers.equalTo(10));
-
-    // Merging clears the old values and updates the result value.
-    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
-
-    assertThat(value1.read(), Matchers.equalTo(21));
-    assertThat(value2.read(), Matchers.equalTo(0));
-  }
-
-  @Test
-  public void testMergeCombiningValueIntoNewNamespace() throws Exception {
-    AccumulatorCombiningState<Integer, int[], Integer> value1 =
-        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value2 =
-        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> value3 =
-        underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
-
-    value1.add(5);
-    value2.add(10);
-    value1.add(6);
-
-    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
-
-    // Merging clears the old values and updates the result value.
-    assertThat(value1.read(), Matchers.equalTo(0));
-    assertThat(value2.read(), Matchers.equalTo(0));
-    assertThat(value3.read(), Matchers.equalTo(21));
-  }
-
-  @Test
-  public void testWatermarkEarliestState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, 
WATERMARK_EARLIEST_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.add(new Instant(3000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.add(new Instant(1000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(null));
-    assertThat(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), 
Matchers.sameInstance(value));
-  }
-
-  @Test
-  public void testWatermarkLatestState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
-        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, 
WATERMARK_LATEST_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.add(new Instant(3000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
-    value.add(new Instant(1000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(null));
-    assertThat(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), 
Matchers.sameInstance(value));
-  }
-
-  @Test
-  public void testWatermarkEndOfWindowState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, 
WATERMARK_EOW_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, 
WATERMARK_EOW_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(null));
-    assertThat(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), 
Matchers.sameInstance(value));
-  }
-
-  @Test
-  public void testWatermarkStateIsEmpty() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add(new Instant(1000));
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeEarliestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState<BoundedWindow> value1 =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value2 =
-        underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
-
-    value1.add(new Instant(3000));
-    value2.add(new Instant(5000));
-    value1.add(new Instant(4000));
-    value2.add(new Instant(2000));
-
-    // Merging clears the old values and updates the merged value.
-    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, 
WINDOW_1);
-
-    assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
-    assertThat(value2.read(), Matchers.equalTo(null));
-  }
-
-  @Test
-  public void testMergeLatestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState<BoundedWindow> value1 =
-        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value2 =
-        underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value3 =
-        underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
-
-    value1.add(new Instant(3000));
-    value2.add(new Instant(5000));
-    value1.add(new Instant(4000));
-    value2.add(new Instant(2000));
-
-    // Merging clears the old values and updates the result value.
-    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, 
WINDOW_1);
-
-    // Merging clears the old values and updates the result value.
-    assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
-    assertThat(value1.read(), Matchers.equalTo(null));
-    assertThat(value2.read(), Matchers.equalTo(null));
-  }
-}

Reply via email to