Repository: beam
Updated Branches:
  refs/heads/master c528fb2f7 -> d5261d74b


[BEAM-2423] Port state internals tests to the new base class StateInternalsTest


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/581ee152
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/581ee152
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/581ee152

Branch: refs/heads/master
Commit: 581ee1520e497fca95e8c4aa75f90050952523d0
Parents: c528fb2
Author: JingsongLi <lzljs3620...@aliyun.com>
Authored: Tue Jun 13 11:26:38 2017 +0800
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Jun 16 11:48:48 2017 +0200

----------------------------------------------------------------------
 runners/apex/pom.xml                            |   7 +
 .../utils/ApexStateInternalsTest.java           | 411 ++++---------------
 .../core/InMemoryStateInternalsTest.java        |  46 ++-
 .../beam/runners/core/StateInternalsTest.java   |  14 +-
 .../FlinkBroadcastStateInternalsTest.java       | 242 +++--------
 .../FlinkKeyGroupStateInternalsTest.java        | 359 ++++++++--------
 .../streaming/FlinkSplitStateInternalsTest.java | 132 +++---
 runners/spark/pom.xml                           |   7 +
 .../spark/stateful/SparkStateInternalsTest.java |  66 +++
 9 files changed, 521 insertions(+), 763 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 4a36bec..d3d4318 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -184,6 +184,13 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-java</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index a7e64af..87aa8c2 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -18,350 +18,109 @@
 package org.apache.beam.runners.apex.translation.utils;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
 import com.datatorrent.lib.util.KryoCloneUtils;
-import java.util.Arrays;
-import 
org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
-import 
org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory;
-import org.apache.beam.runners.core.StateMerging;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsTest;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaceForTest;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.GroupingState;
-import org.apache.beam.sdk.state.ReadableState;
 import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.Suite;
 
 /**
  * Tests for {@link ApexStateInternals}. This is based on the tests for
- * {@code InMemoryStateInternals}.
+ * {@code StateInternalsTest}.
  */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    ApexStateInternalsTest.StandardStateInternalsTests.class,
+    ApexStateInternalsTest.OtherTests.class
+})
 public class ApexStateInternalsTest {
-  private static final BoundedWindow WINDOW_1 = new IntervalWindow(new 
Instant(0), new Instant(10));
-  private static final StateNamespace NAMESPACE_1 = new 
StateNamespaceForTest("ns1");
-  private static final StateNamespace NAMESPACE_2 = new 
StateNamespaceForTest("ns2");
-  private static final StateNamespace NAMESPACE_3 = new 
StateNamespaceForTest("ns3");
 
-  private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
-      StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<CombiningState<Integer, int[], Integer>>
-      SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
-          "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
-  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
-      StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<WatermarkHoldState>
-      WATERMARK_EARLIEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.EARLIEST);
-  private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
-  private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
-      StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.END_OF_WINDOW);
-
-  private ApexStateInternals<String> underTest;
-
-  @Before
-  public void initStateInternals() {
-    underTest = new ApexStateInternals.ApexStateBackend()
+  private static StateInternals newStateInternals() {
+    return new ApexStateInternals.ApexStateBackend()
         .newStateInternalsFactory(StringUtf8Coder.of())
-        .stateInternalsForKey((String) null);
-  }
-
-  @Test
-  public void testBag() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
-    assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
-
-    assertThat(value.read(), Matchers.emptyIterable());
-    value.add("hello");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
-
-    value.add("world");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
-
-    value.clear();
-    assertThat(value.read(), Matchers.emptyIterable());
-    assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
-
-  }
-
-  @Test
-  public void testBagIsEmpty() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add("hello");
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeBagIntoSource() throws Exception {
-    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-
-    bag1.add("Hello");
-    bag2.add("World");
-    bag1.add("!");
-
-    StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
-
-    // Reading the merged bag gets both the contents
-    assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", 
"!"));
-    assertThat(bag2.read(), Matchers.emptyIterable());
-  }
-
-  @Test
-  public void testMergeBagIntoNewNamespace() throws Exception {
-    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-    BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
-
-    bag1.add("Hello");
-    bag2.add("World");
-    bag1.add("!");
-
-    StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
-
-    // Reading the merged bag gets both the contents
-    assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", 
"!"));
-    assertThat(bag1.read(), Matchers.emptyIterable());
-    assertThat(bag2.read(), Matchers.emptyIterable());
+        .stateInternalsForKey("dummyKey");
+  }
+
+  /**
+   * A standard StateInternals test. Ignore set and map tests.
+   */
+  @RunWith(JUnit4.class)
+  public static class StandardStateInternalsTests extends StateInternalsTest {
+    @Override
+    protected StateInternals createStateInternals() {
+      return newStateInternals();
+    }
+
+    @Override
+    @Ignore
+    public void testSet() {}
+
+    @Override
+    @Ignore
+    public void testSetIsEmpty() {}
+
+    @Override
+    @Ignore
+    public void testMergeSetIntoSource() {}
+
+    @Override
+    @Ignore
+    public void testMergeSetIntoNewNamespace() {}
+
+    @Override
+    @Ignore
+    public void testMap() {}
+
+    @Override
+    @Ignore
+    public void testSetReadable() {}
+
+    @Override
+    @Ignore
+    public void testMapReadable() {}
+  }
+
+  /**
+   * A specific test of ApexStateInternalsTest.
+   */
+  @RunWith(JUnit4.class)
+  public static class OtherTests {
+
+    private static final StateNamespace NAMESPACE = new 
StateNamespaceForTest("ns");
+    private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
+        StateTags.value("stringValue", StringUtf8Coder.of());
+
+    @Test
+    public void testSerialization() throws Exception {
+      ApexStateInternals.ApexStateInternalsFactory<String> sif =
+          new ApexStateInternals.ApexStateBackend().
+          newStateInternalsFactory(StringUtf8Coder.of());
+      ApexStateInternals<String> keyAndState = 
sif.stateInternalsForKey("dummy");
+
+      ValueState<String> value = keyAndState.state(NAMESPACE, 
STRING_VALUE_ADDR);
+      assertEquals(keyAndState.state(NAMESPACE, STRING_VALUE_ADDR), value);
+      value.write("hello");
+
+      ApexStateInternals.ApexStateInternalsFactory<String> cloned;
+      assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif));
+      ApexStateInternals<String> clonedKeyAndState = 
cloned.stateInternalsForKey("dummy");
+
+      ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE, 
STRING_VALUE_ADDR);
+      assertThat(clonedValue.read(), Matchers.equalTo("hello"));
+      assertEquals(clonedKeyAndState.state(NAMESPACE, STRING_VALUE_ADDR), 
value);
+    }
   }
-
-  @Test
-  public void testCombiningValue() throws Exception {
-    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, 
SUM_INTEGER_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
-
-    assertThat(value.read(), Matchers.equalTo(0));
-    value.add(2);
-    assertThat(value.read(), Matchers.equalTo(2));
-
-    value.add(3);
-    assertThat(value.read(), Matchers.equalTo(5));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(0));
-    assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
-  }
-
-  @Test
-  public void testCombiningIsEmpty() throws Exception {
-    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, 
SUM_INTEGER_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add(5);
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeCombiningValueIntoSource() throws Exception {
-    CombiningState<Integer, int[], Integer> value1 =
-        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    CombiningState<Integer, int[], Integer> value2 =
-        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-
-    value1.add(5);
-    value2.add(10);
-    value1.add(6);
-
-    assertThat(value1.read(), Matchers.equalTo(11));
-    assertThat(value2.read(), Matchers.equalTo(10));
-
-    // Merging clears the old values and updates the result value.
-    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
-
-    assertThat(value1.read(), Matchers.equalTo(21));
-    assertThat(value2.read(), Matchers.equalTo(0));
-  }
-
-  @Test
-  public void testMergeCombiningValueIntoNewNamespace() throws Exception {
-    CombiningState<Integer, int[], Integer> value1 =
-        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    CombiningState<Integer, int[], Integer> value2 =
-        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-    CombiningState<Integer, int[], Integer> value3 =
-        underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
-
-    value1.add(5);
-    value2.add(10);
-    value1.add(6);
-
-    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
-
-    // Merging clears the old values and updates the result value.
-    assertThat(value1.read(), Matchers.equalTo(0));
-    assertThat(value2.read(), Matchers.equalTo(0));
-    assertThat(value3.read(), Matchers.equalTo(21));
-  }
-
-  @Test
-  public void testWatermarkEarliestState() throws Exception {
-    WatermarkHoldState value =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, 
WATERMARK_EARLIEST_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.add(new Instant(3000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.add(new Instant(1000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(null));
-    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value);
-  }
-
-  @Test
-  public void testWatermarkLatestState() throws Exception {
-    WatermarkHoldState value =
-        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, 
WATERMARK_LATEST_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.add(new Instant(3000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
-    value.add(new Instant(1000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(null));
-    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value);
-  }
-
-  @Test
-  public void testWatermarkEndOfWindowState() throws Exception {
-    WatermarkHoldState value = underTest.state(NAMESPACE_1, 
WATERMARK_EOW_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, 
WATERMARK_EOW_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(null));
-    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value);
-  }
-
-  @Test
-  public void testWatermarkStateIsEmpty() throws Exception {
-    WatermarkHoldState value =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add(new Instant(1000));
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeEarliestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState value1 =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-    WatermarkHoldState value2 =
-        underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
-
-    value1.add(new Instant(3000));
-    value2.add(new Instant(5000));
-    value1.add(new Instant(4000));
-    value2.add(new Instant(2000));
-
-    // Merging clears the old values and updates the merged value.
-    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, 
WINDOW_1);
-
-    assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
-    assertThat(value2.read(), Matchers.equalTo(null));
-  }
-
-  @Test
-  public void testMergeLatestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState value1 =
-        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState value2 =
-        underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState value3 =
-        underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
-
-    value1.add(new Instant(3000));
-    value2.add(new Instant(5000));
-    value1.add(new Instant(4000));
-    value2.add(new Instant(2000));
-
-    // Merging clears the old values and updates the result value.
-    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, 
WINDOW_1);
-
-    // Merging clears the old values and updates the result value.
-    assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
-    assertThat(value1.read(), Matchers.equalTo(null));
-    assertThat(value2.read(), Matchers.equalTo(null));
-  }
-
-  @Test
-  public void testSerialization() throws Exception {
-    ApexStateInternalsFactory<String> sif = new ApexStateBackend().
-        newStateInternalsFactory(StringUtf8Coder.of());
-    ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy");
-
-    ValueState<String> value = keyAndState.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
-    assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
-    value.write("hello");
-
-    ApexStateInternalsFactory<String> cloned;
-    assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif));
-    ApexStateInternals<String> clonedKeyAndState = 
cloned.stateInternalsForKey("dummy");
-
-    ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
-    assertThat(clonedValue.read(), Matchers.equalTo("hello"));
-    assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), 
value);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 335c2f8..1c6cd30 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -19,7 +19,17 @@ package org.apache.beam.runners.core;
 
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.SetState;
 import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -53,21 +63,41 @@ public class InMemoryStateInternalsTest {
   @RunWith(JUnit4.class)
   public static class OtherTests {
 
+    private static final StateNamespace NAMESPACE = new 
StateNamespaceForTest("ns");
+
+    private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
+        StateTags.value("stringValue", StringUtf8Coder.of());
+    private static final StateTag<CombiningState<Integer, int[], Integer>>
+        SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
+        "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
+    private static final StateTag<BagState<String>> STRING_BAG_ADDR =
+        StateTags.bag("stringBag", StringUtf8Coder.of());
+    private static final StateTag<SetState<String>> STRING_SET_ADDR =
+        StateTags.set("stringSet", StringUtf8Coder.of());
+    private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR =
+        StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
+    private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+        StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.EARLIEST);
+    private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
+        StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.LATEST);
+    private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
+        StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.END_OF_WINDOW);
+
     StateInternals underTest = new InMemoryStateInternals<>("dummyKey");
 
     @Test
     public void testSameInstance() {
-      assertSameInstance(StateInternalsTest.STRING_VALUE_ADDR);
-      assertSameInstance(StateInternalsTest.SUM_INTEGER_ADDR);
-      assertSameInstance(StateInternalsTest.STRING_BAG_ADDR);
-      assertSameInstance(StateInternalsTest.STRING_SET_ADDR);
-      assertSameInstance(StateInternalsTest.STRING_MAP_ADDR);
-      assertSameInstance(StateInternalsTest.WATERMARK_EARLIEST_ADDR);
+      assertSameInstance(STRING_VALUE_ADDR);
+      assertSameInstance(SUM_INTEGER_ADDR);
+      assertSameInstance(STRING_BAG_ADDR);
+      assertSameInstance(STRING_SET_ADDR);
+      assertSameInstance(STRING_MAP_ADDR);
+      assertSameInstance(WATERMARK_EARLIEST_ADDR);
     }
 
     private <T extends State> void assertSameInstance(StateTag<T> address) {
-      assertThat(underTest.state(StateInternalsTest.NAMESPACE_1, address),
-          
Matchers.sameInstance(underTest.state(StateInternalsTest.NAMESPACE_1, 
address)));
+      assertThat(underTest.state(NAMESPACE, address),
+          Matchers.sameInstance(underTest.state(NAMESPACE, address)));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
index 6011fb4..ae07fe6 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
@@ -56,22 +56,22 @@ import org.junit.Test;
 public abstract class StateInternalsTest {
 
   private static final BoundedWindow WINDOW_1 = new IntervalWindow(new 
Instant(0), new Instant(10));
-  static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
+  private static final StateNamespace NAMESPACE_1 = new 
StateNamespaceForTest("ns1");
   private static final StateNamespace NAMESPACE_2 = new 
StateNamespaceForTest("ns2");
   private static final StateNamespace NAMESPACE_3 = new 
StateNamespaceForTest("ns3");
 
-  static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
+  private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
       StateTags.value("stringValue", StringUtf8Coder.of());
-  static final StateTag<CombiningState<Integer, int[], Integer>>
+  private static final StateTag<CombiningState<Integer, int[], Integer>>
       SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
           "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
-  static final StateTag<BagState<String>> STRING_BAG_ADDR =
+  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
       StateTags.bag("stringBag", StringUtf8Coder.of());
-  static final StateTag<SetState<String>> STRING_SET_ADDR =
+  private static final StateTag<SetState<String>> STRING_SET_ADDR =
       StateTags.set("stringSet", StringUtf8Coder.of());
-  static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR =
+  private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR =
       StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
-  static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+  private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
       StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.EARLIEST);
   private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
       StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);

http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
index 2b96d91..3409d27 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java
@@ -17,229 +17,87 @@
  */
 package org.apache.beam.runners.flink.streaming;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
-
-import java.util.Arrays;
-import org.apache.beam.runners.core.StateMerging;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaceForTest;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsTest;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.GroupingState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.Ignore;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
  * Tests for {@link FlinkBroadcastStateInternals}. This is based on the tests 
for
- * {@code InMemoryStateInternals}.
+ * {@code StateInternalsTest}.
+ *
+ * <p>Just test value, bag and combining.
  */
 @RunWith(JUnit4.class)
-public class FlinkBroadcastStateInternalsTest {
-  private static final StateNamespace NAMESPACE_1 = new 
StateNamespaceForTest("ns1");
-  private static final StateNamespace NAMESPACE_2 = new 
StateNamespaceForTest("ns2");
-  private static final StateNamespace NAMESPACE_3 = new 
StateNamespaceForTest("ns3");
-
-  private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
-      StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<CombiningState<Integer, int[], Integer>>
-      SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
-          "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
-  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
-      StateTags.bag("stringBag", StringUtf8Coder.of());
-
-  FlinkBroadcastStateInternals<String> underTest;
-
-  @Before
-  public void initStateInternals() {
+public class FlinkBroadcastStateInternalsTest extends StateInternalsTest {
+
+  @Override
+  protected StateInternals createStateInternals() {
     MemoryStateBackend backend = new MemoryStateBackend();
     try {
       OperatorStateBackend operatorStateBackend =
           backend.createOperatorStateBackend(new DummyEnvironment("test", 1, 
0), "");
-      underTest = new FlinkBroadcastStateInternals<>(1, operatorStateBackend);
-
+      return new FlinkBroadcastStateInternals<>(1, operatorStateBackend);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
-  @Test
-  public void testValue() throws Exception {
-    ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR);
-
-    assertEquals(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
-    assertNotEquals(
-        underTest.state(NAMESPACE_2, STRING_VALUE_ADDR),
-        value);
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.write("hello");
-    assertThat(value.read(), Matchers.equalTo("hello"));
-    value.write("world");
-    assertThat(value.read(), Matchers.equalTo("world"));
-
-    value.clear();
-    assertThat(value.read(), Matchers.nullValue());
-    assertEquals(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
-
-  }
-
-  @Test
-  public void testBag() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+  @Override
+  @Ignore
+  public void testSet() {}
 
-    assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
+  @Override
+  @Ignore
+  public void testSetIsEmpty() {}
 
-    assertThat(value.read(), Matchers.emptyIterable());
-    value.add("hello");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
+  @Override
+  @Ignore
+  public void testMergeSetIntoSource() {}
 
-    value.add("world");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
+  @Override
+  @Ignore
+  public void testMergeSetIntoNewNamespace() {}
 
-    value.clear();
-    assertThat(value.read(), Matchers.emptyIterable());
-    assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
+  @Override
+  @Ignore
+  public void testMap() {}
 
-  }
-
-  @Test
-  public void testBagIsEmpty() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+  @Override
+  @Ignore
+  public void testWatermarkEarliestState() {}
 
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add("hello");
-    assertThat(readFuture.read(), Matchers.is(false));
+  @Override
+  @Ignore
+  public void testWatermarkLatestState() {}
 
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
+  @Override
+  @Ignore
+  public void testWatermarkEndOfWindowState() {}
 
-  @Test
-  public void testMergeBagIntoSource() throws Exception {
-    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+  @Override
+  @Ignore
+  public void testWatermarkStateIsEmpty() {}
 
-    bag1.add("Hello");
-    bag2.add("World");
-    bag1.add("!");
+  @Override
+  @Ignore
+  public void testMergeEarliestWatermarkIntoSource() {}
 
-    StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
-
-    // Reading the merged bag gets both the contents
-    assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", 
"!"));
-    assertThat(bag2.read(), Matchers.emptyIterable());
-  }
+  @Override
+  @Ignore
+  public void testMergeLatestWatermarkIntoSource() {}
 
-  @Test
-  public void testMergeBagIntoNewNamespace() throws Exception {
-    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-    BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
-
-    bag1.add("Hello");
-    bag2.add("World");
-    bag1.add("!");
-
-    StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
-
-    // Reading the merged bag gets both the contents
-    assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", 
"!"));
-    assertThat(bag1.read(), Matchers.emptyIterable());
-    assertThat(bag2.read(), Matchers.emptyIterable());
-  }
+  @Override
+  @Ignore
+  public void testSetReadable() {}
 
-  @Test
-  public void testCombiningValue() throws Exception {
-    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, 
SUM_INTEGER_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
-
-    assertThat(value.read(), Matchers.equalTo(0));
-    value.add(2);
-    assertThat(value.read(), Matchers.equalTo(2));
-
-    value.add(3);
-    assertThat(value.read(), Matchers.equalTo(5));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(0));
-    assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
-  }
-
-  @Test
-  public void testCombiningIsEmpty() throws Exception {
-    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, 
SUM_INTEGER_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add(5);
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeCombiningValueIntoSource() throws Exception {
-    CombiningState<Integer, int[], Integer> value1 =
-        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    CombiningState<Integer, int[], Integer> value2 =
-        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-
-    value1.add(5);
-    value2.add(10);
-    value1.add(6);
-
-    assertThat(value1.read(), Matchers.equalTo(11));
-    assertThat(value2.read(), Matchers.equalTo(10));
-
-    // Merging clears the old values and updates the result value.
-    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
-
-    assertThat(value1.read(), Matchers.equalTo(21));
-    assertThat(value2.read(), Matchers.equalTo(0));
-  }
-
-  @Test
-  public void testMergeCombiningValueIntoNewNamespace() throws Exception {
-    CombiningState<Integer, int[], Integer> value1 =
-        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    CombiningState<Integer, int[], Integer> value2 =
-        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-    CombiningState<Integer, int[], Integer> value3 =
-        underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
-
-    value1.add(5);
-    value2.add(10);
-    value1.add(6);
-
-    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
-
-    // Merging clears the old values and updates the result value.
-    assertThat(value1.read(), Matchers.equalTo(0));
-    assertThat(value2.read(), Matchers.equalTo(0));
-    assertThat(value3.read(), Matchers.equalTo(21));
-  }
+  @Override
+  @Ignore
+  public void testMapReadable() {}
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
index 4012373..aed14f3 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.flink.streaming;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 
 import java.io.ByteArrayInputStream;
@@ -26,8 +24,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.apache.beam.runners.core.StateMerging;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsTest;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaceForTest;
 import org.apache.beam.runners.core.StateTag;
@@ -35,7 +33,6 @@ import org.apache.beam.runners.core.StateTags;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.ReadableState;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
@@ -47,215 +44,219 @@ import 
org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.operators.KeyContext;
 import org.hamcrest.Matchers;
-import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.junit.runners.Suite;
 
 /**
  * Tests for {@link FlinkKeyGroupStateInternals}. This is based on the tests 
for
- * {@code InMemoryStateInternals}.
+ * {@code StateInternalsTest}.
  */
-@RunWith(JUnit4.class)
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    FlinkKeyGroupStateInternalsTest.StandardStateInternalsTests.class,
+    FlinkKeyGroupStateInternalsTest.OtherTests.class
+})
 public class FlinkKeyGroupStateInternalsTest {
-  private static final StateNamespace NAMESPACE_1 = new 
StateNamespaceForTest("ns1");
-  private static final StateNamespace NAMESPACE_2 = new 
StateNamespaceForTest("ns2");
-  private static final StateNamespace NAMESPACE_3 = new 
StateNamespaceForTest("ns3");
 
-  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
-      StateTags.bag("stringBag", StringUtf8Coder.of());
-
-  FlinkKeyGroupStateInternals<String> underTest;
-  private KeyedStateBackend keyedStateBackend;
-
-  @Before
-  public void initStateInternals() {
-    try {
-      keyedStateBackend = getKeyedStateBackend(2, new KeyGroupRange(0, 1));
-      underTest = new FlinkKeyGroupStateInternals<>(StringUtf8Coder.of(), 
keyedStateBackend);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+  /**
+   * A standard StateInternals test. Just test BagState.
+   */
+  @RunWith(JUnit4.class)
+  public static class StandardStateInternalsTests extends StateInternalsTest {
+    @Override
+    protected StateInternals createStateInternals() {
+      KeyedStateBackend keyedStateBackend =
+          getKeyedStateBackend(2, new KeyGroupRange(0, 1));
+      return new FlinkKeyGroupStateInternals<>(StringUtf8Coder.of(), 
keyedStateBackend);
     }
-  }
 
-  private KeyedStateBackend getKeyedStateBackend(int numberOfKeyGroups,
-                                                   KeyGroupRange 
keyGroupRange) {
-    MemoryStateBackend backend = new MemoryStateBackend();
-    try {
-      AbstractKeyedStateBackend<ByteBuffer> keyedStateBackend = 
backend.createKeyedStateBackend(
-          new DummyEnvironment("test", 1, 0),
-          new JobID(),
-          "test_op",
-          new GenericTypeInfo<>(ByteBuffer.class).createSerializer(new 
ExecutionConfig()),
-          numberOfKeyGroups,
-          keyGroupRange,
-          new KvStateRegistry().createTaskRegistry(new JobID(), new 
JobVertexID()));
-      keyedStateBackend.setCurrentKey(ByteBuffer.wrap(
-          CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "1")));
-      return keyedStateBackend;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
+    @Override
+    @Ignore
+    public void testValue() {}
 
-  @Test
-  public void testBag() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+    @Override
+    @Ignore
+    public void testSet() {}
 
-    assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
+    @Override
+    @Ignore
+    public void testSetIsEmpty() {}
 
-    assertThat(value.read(), Matchers.emptyIterable());
-    value.add("hello");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
+    @Override
+    @Ignore
+    public void testMergeSetIntoSource() {}
 
-    value.add("world");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
+    @Override
+    @Ignore
+    public void testMergeSetIntoNewNamespace() {}
 
-    value.clear();
-    assertThat(value.read(), Matchers.emptyIterable());
-    assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
+    @Override
+    @Ignore
+    public void testMap() {}
 
-  }
+    @Override
+    @Ignore
+    public void testCombiningValue() {}
 
-  @Test
-  public void testBagIsEmpty() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+    @Override
+    @Ignore
+    public void testCombiningIsEmpty() {}
 
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add("hello");
-    assertThat(readFuture.read(), Matchers.is(false));
+    @Override
+    @Ignore
+    public void testMergeCombiningValueIntoSource() {}
 
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
+    @Override
+    @Ignore
+    public void testMergeCombiningValueIntoNewNamespace() {}
 
-  @Test
-  public void testMergeBagIntoSource() throws Exception {
-    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+    @Override
+    @Ignore
+    public void testWatermarkEarliestState() {}
 
-    bag1.add("Hello");
-    bag2.add("World");
-    bag1.add("!");
+    @Override
+    @Ignore
+    public void testWatermarkLatestState() {}
 
-    StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
+    @Override
+    @Ignore
+    public void testWatermarkEndOfWindowState() {}
 
-    // Reading the merged bag gets both the contents
-    assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", 
"!"));
-    assertThat(bag2.read(), Matchers.emptyIterable());
-  }
+    @Override
+    @Ignore
+    public void testWatermarkStateIsEmpty() {}
 
-  @Test
-  public void testMergeBagIntoNewNamespace() throws Exception {
-    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-    BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
+    @Override
+    @Ignore
+    public void testMergeEarliestWatermarkIntoSource() {}
 
-    bag1.add("Hello");
-    bag2.add("World");
-    bag1.add("!");
+    @Override
+    @Ignore
+    public void testMergeLatestWatermarkIntoSource() {}
 
-    StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
+    @Override
+    @Ignore
+    public void testSetReadable() {}
 
-    // Reading the merged bag gets both the contents
-    assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", 
"!"));
-    assertThat(bag1.read(), Matchers.emptyIterable());
-    assertThat(bag2.read(), Matchers.emptyIterable());
+    @Override
+    @Ignore
+    public void testMapReadable() {}
   }
 
-  @Test
-  public void testKeyGroupAndCheckpoint() throws Exception {
-    // assign to keyGroup 0
-    ByteBuffer key0 = ByteBuffer.wrap(
-        CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "11111111"));
-    // assign to keyGroup 1
-    ByteBuffer key1 = ByteBuffer.wrap(
-        CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "22222222"));
-    FlinkKeyGroupStateInternals<String> allState;
-    {
-      KeyedStateBackend keyedStateBackend = getKeyedStateBackend(2, new 
KeyGroupRange(0, 1));
-      allState = new FlinkKeyGroupStateInternals<>(
-          StringUtf8Coder.of(), keyedStateBackend);
-      BagState<String> valueForNamespace0 = allState.state(NAMESPACE_1, 
STRING_BAG_ADDR);
-      BagState<String> valueForNamespace1 = allState.state(NAMESPACE_2, 
STRING_BAG_ADDR);
-      keyedStateBackend.setCurrentKey(key0);
-      valueForNamespace0.add("0");
-      valueForNamespace1.add("2");
-      keyedStateBackend.setCurrentKey(key1);
-      valueForNamespace0.add("1");
-      valueForNamespace1.add("3");
-      assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("0", 
"1"));
-      assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("2", 
"3"));
-    }
-
-    ClassLoader classLoader = 
FlinkKeyGroupStateInternalsTest.class.getClassLoader();
-
-    // 1. scale up
-    ByteArrayOutputStream out0 = new ByteArrayOutputStream();
-    allState.snapshotKeyGroupState(0, new DataOutputStream(out0));
-    DataInputStream in0 = new DataInputStream(
-        new ByteArrayInputStream(out0.toByteArray()));
-    {
-      KeyedStateBackend keyedStateBackend = getKeyedStateBackend(2, new 
KeyGroupRange(0, 0));
-      FlinkKeyGroupStateInternals<String> state0 =
-          new FlinkKeyGroupStateInternals<>(
-              StringUtf8Coder.of(), keyedStateBackend);
-      state0.restoreKeyGroupState(0, in0, classLoader);
-      BagState<String> valueForNamespace0 = state0.state(NAMESPACE_1, 
STRING_BAG_ADDR);
-      BagState<String> valueForNamespace1 = state0.state(NAMESPACE_2, 
STRING_BAG_ADDR);
-      assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("0"));
-      assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("2"));
-    }
-
-    ByteArrayOutputStream out1 = new ByteArrayOutputStream();
-    allState.snapshotKeyGroupState(1, new DataOutputStream(out1));
-    DataInputStream in1 = new DataInputStream(
-        new ByteArrayInputStream(out1.toByteArray()));
-    {
-      KeyedStateBackend keyedStateBackend = getKeyedStateBackend(2, new 
KeyGroupRange(1, 1));
-      FlinkKeyGroupStateInternals<String> state1 =
-          new FlinkKeyGroupStateInternals<>(
-              StringUtf8Coder.of(), keyedStateBackend);
-      state1.restoreKeyGroupState(1, in1, classLoader);
-      BagState<String> valueForNamespace0 = state1.state(NAMESPACE_1, 
STRING_BAG_ADDR);
-      BagState<String> valueForNamespace1 = state1.state(NAMESPACE_2, 
STRING_BAG_ADDR);
-      assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("1"));
-      assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("3"));
-    }
-
-    // 2. scale down
-    {
-      KeyedStateBackend keyedStateBackend = getKeyedStateBackend(2, new 
KeyGroupRange(0, 1));
-      FlinkKeyGroupStateInternals<String> newAllState = new 
FlinkKeyGroupStateInternals<>(
-          StringUtf8Coder.of(), keyedStateBackend);
-      in0.reset();
-      in1.reset();
-      newAllState.restoreKeyGroupState(0, in0, classLoader);
-      newAllState.restoreKeyGroupState(1, in1, classLoader);
-      BagState<String> valueForNamespace0 = newAllState.state(NAMESPACE_1, 
STRING_BAG_ADDR);
-      BagState<String> valueForNamespace1 = newAllState.state(NAMESPACE_2, 
STRING_BAG_ADDR);
-      assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("0", 
"1"));
-      assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("2", 
"3"));
+  /**
+   * A specific test of FlinkKeyGroupStateInternalsTest.
+   */
+  @RunWith(JUnit4.class)
+  public static class OtherTests {
+
+    private static final StateNamespace NAMESPACE_1 = new 
StateNamespaceForTest("ns1");
+    private static final StateNamespace NAMESPACE_2 = new 
StateNamespaceForTest("ns2");
+    private static final StateTag<BagState<String>> STRING_BAG_ADDR =
+        StateTags.bag("stringBag", StringUtf8Coder.of());
+
+    @Test
+    public void testKeyGroupAndCheckpoint() throws Exception {
+      // assign to keyGroup 0
+      ByteBuffer key0 = ByteBuffer.wrap(
+          CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "11111111"));
+      // assign to keyGroup 1
+      ByteBuffer key1 = ByteBuffer.wrap(
+          CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "22222222"));
+      FlinkKeyGroupStateInternals<String> allState;
+      {
+        KeyedStateBackend<ByteBuffer> keyedStateBackend =
+            getKeyedStateBackend(2, new KeyGroupRange(0, 1));
+        allState = new FlinkKeyGroupStateInternals<>(
+            StringUtf8Coder.of(), keyedStateBackend);
+        BagState<String> valueForNamespace0 = allState.state(NAMESPACE_1, 
STRING_BAG_ADDR);
+        BagState<String> valueForNamespace1 = allState.state(NAMESPACE_2, 
STRING_BAG_ADDR);
+        keyedStateBackend.setCurrentKey(key0);
+        valueForNamespace0.add("0");
+        valueForNamespace1.add("2");
+        keyedStateBackend.setCurrentKey(key1);
+        valueForNamespace0.add("1");
+        valueForNamespace1.add("3");
+        assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("0", 
"1"));
+        assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("2", 
"3"));
+      }
+
+      ClassLoader classLoader = 
FlinkKeyGroupStateInternalsTest.class.getClassLoader();
+
+      // 1. scale up
+      ByteArrayOutputStream out0 = new ByteArrayOutputStream();
+      allState.snapshotKeyGroupState(0, new DataOutputStream(out0));
+      DataInputStream in0 = new DataInputStream(
+          new ByteArrayInputStream(out0.toByteArray()));
+      {
+        KeyedStateBackend<ByteBuffer> keyedStateBackend =
+            getKeyedStateBackend(2, new KeyGroupRange(0, 0));
+        FlinkKeyGroupStateInternals<String> state0 =
+            new FlinkKeyGroupStateInternals<>(
+                StringUtf8Coder.of(), keyedStateBackend);
+        state0.restoreKeyGroupState(0, in0, classLoader);
+        BagState<String> valueForNamespace0 = state0.state(NAMESPACE_1, 
STRING_BAG_ADDR);
+        BagState<String> valueForNamespace1 = state0.state(NAMESPACE_2, 
STRING_BAG_ADDR);
+        assertThat(valueForNamespace0.read(), 
Matchers.containsInAnyOrder("0"));
+        assertThat(valueForNamespace1.read(), 
Matchers.containsInAnyOrder("2"));
+      }
+
+      ByteArrayOutputStream out1 = new ByteArrayOutputStream();
+      allState.snapshotKeyGroupState(1, new DataOutputStream(out1));
+      DataInputStream in1 = new DataInputStream(
+          new ByteArrayInputStream(out1.toByteArray()));
+      {
+        KeyedStateBackend<ByteBuffer> keyedStateBackend =
+            getKeyedStateBackend(2, new KeyGroupRange(1, 1));
+        FlinkKeyGroupStateInternals<String> state1 =
+            new FlinkKeyGroupStateInternals<>(
+                StringUtf8Coder.of(), keyedStateBackend);
+        state1.restoreKeyGroupState(1, in1, classLoader);
+        BagState<String> valueForNamespace0 = state1.state(NAMESPACE_1, 
STRING_BAG_ADDR);
+        BagState<String> valueForNamespace1 = state1.state(NAMESPACE_2, 
STRING_BAG_ADDR);
+        assertThat(valueForNamespace0.read(), 
Matchers.containsInAnyOrder("1"));
+        assertThat(valueForNamespace1.read(), 
Matchers.containsInAnyOrder("3"));
+      }
+
+      // 2. scale down
+      {
+        KeyedStateBackend<ByteBuffer> keyedStateBackend =
+            getKeyedStateBackend(2, new KeyGroupRange(0, 1));
+        FlinkKeyGroupStateInternals<String> newAllState = new 
FlinkKeyGroupStateInternals<>(
+            StringUtf8Coder.of(), keyedStateBackend);
+        in0.reset();
+        in1.reset();
+        newAllState.restoreKeyGroupState(0, in0, classLoader);
+        newAllState.restoreKeyGroupState(1, in1, classLoader);
+        BagState<String> valueForNamespace0 = newAllState.state(NAMESPACE_1, 
STRING_BAG_ADDR);
+        BagState<String> valueForNamespace1 = newAllState.state(NAMESPACE_2, 
STRING_BAG_ADDR);
+        assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("0", 
"1"));
+        assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("2", 
"3"));
+      }
     }
 
   }
 
-  private static class TestKeyContext implements KeyContext {
-
-    private Object key;
-
-    @Override
-    public void setCurrentKey(Object key) {
-      this.key = key;
-    }
-
-    @Override
-    public Object getCurrentKey() {
-      return key;
+  private static KeyedStateBackend<ByteBuffer> getKeyedStateBackend(int 
numberOfKeyGroups,
+                                                   KeyGroupRange 
keyGroupRange) {
+    MemoryStateBackend backend = new MemoryStateBackend();
+    try {
+      AbstractKeyedStateBackend<ByteBuffer> keyedStateBackend = 
backend.createKeyedStateBackend(
+          new DummyEnvironment("test", 1, 0),
+          new JobID(),
+          "test_op",
+          new GenericTypeInfo<>(ByteBuffer.class).createSerializer(new 
ExecutionConfig()),
+          numberOfKeyGroups,
+          keyGroupRange,
+          new KvStateRegistry().createTaskRegistry(new JobID(), new 
JobVertexID()));
+      keyedStateBackend.setCurrentKey(ByteBuffer.wrap(
+          CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "1")));
+      return keyedStateBackend;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
index 17cd3f5..667b5ba 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java
@@ -17,85 +17,115 @@
  */
 package org.apache.beam.runners.flink.streaming;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaceForTest;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsTest;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.ReadableState;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.Ignore;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
  * Tests for {@link FlinkSplitStateInternals}. This is based on the tests for
- * {@code InMemoryStateInternals}.
+ * {@code StateInternalsTest}.
+ *
+ * <p>Just test testBag and testBagIsEmpty.
  */
 @RunWith(JUnit4.class)
-public class FlinkSplitStateInternalsTest {
-  private static final StateNamespace NAMESPACE_1 = new 
StateNamespaceForTest("ns1");
-  private static final StateNamespace NAMESPACE_2 = new 
StateNamespaceForTest("ns2");
-
-  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
-      StateTags.bag("stringBag", StringUtf8Coder.of());
-
-  FlinkSplitStateInternals<String> underTest;
+public class FlinkSplitStateInternalsTest extends StateInternalsTest {
 
-  @Before
-  public void initStateInternals() {
+  @Override
+  protected StateInternals createStateInternals() {
     MemoryStateBackend backend = new MemoryStateBackend();
     try {
       OperatorStateBackend operatorStateBackend =
           backend.createOperatorStateBackend(new DummyEnvironment("test", 1, 
0), "");
-      underTest = new FlinkSplitStateInternals<>(operatorStateBackend);
-
+      return new FlinkSplitStateInternals<>(operatorStateBackend);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
-  @Test
-  public void testBag() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+  @Override
+  @Ignore
+  public void testMergeBagIntoSource() {}
 
-    assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
+  @Override
+  @Ignore
+  public void testMergeBagIntoNewNamespace() {}
 
-    assertThat(value.read(), Matchers.emptyIterable());
-    value.add("hello");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
+  @Override
+  @Ignore
+  public void testValue() {}
 
-    value.add("world");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
+  @Override
+  @Ignore
+  public void testSet() {}
 
-    value.clear();
-    assertThat(value.read(), Matchers.emptyIterable());
-    assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
+  @Override
+  @Ignore
+  public void testSetIsEmpty() {}
 
-  }
+  @Override
+  @Ignore
+  public void testMergeSetIntoSource() {}
 
-  @Test
-  public void testBagIsEmpty() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+  @Override
+  @Ignore
+  public void testMergeSetIntoNewNamespace() {}
 
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add("hello");
-    assertThat(readFuture.read(), Matchers.is(false));
+  @Override
+  @Ignore
+  public void testMap() {}
 
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
+  @Override
+  @Ignore
+  public void testCombiningValue() {}
+
+  @Override
+  @Ignore
+  public void testCombiningIsEmpty() {}
+
+  @Override
+  @Ignore
+  public void testMergeCombiningValueIntoSource() {}
+
+  @Override
+  @Ignore
+  public void testMergeCombiningValueIntoNewNamespace() {}
+
+  @Override
+  @Ignore
+  public void testWatermarkEarliestState() {}
+
+  @Override
+  @Ignore
+  public void testWatermarkLatestState() {}
+
+  @Override
+  @Ignore
+  public void testWatermarkEndOfWindowState() {}
+
+  @Override
+  @Ignore
+  public void testWatermarkStateIsEmpty() {}
+
+  @Override
+  @Ignore
+  public void testMergeEarliestWatermarkIntoSource() {}
+
+  @Override
+  @Ignore
+  public void testMergeLatestWatermarkIntoSource() {}
+
+  @Override
+  @Ignore
+  public void testSetReadable() {}
+
+  @Override
+  @Ignore
+  public void testMapReadable() {}
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index ddb4aca..d1dba32 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -321,6 +321,13 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-java</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java
new file mode 100644
index 0000000..b4597f9
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.stateful;
+
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsTest;
+import org.junit.Ignore;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link SparkStateInternals}. This is based on {@link 
StateInternalsTest}.
+ * Ignore set and map tests.
+ */
+@RunWith(JUnit4.class)
+public class SparkStateInternalsTest extends StateInternalsTest {
+
+  @Override
+  protected StateInternals createStateInternals() {
+    return SparkStateInternals.forKey("dummyKey");
+  }
+
+  @Override
+  @Ignore
+  public void testSet() {}
+
+  @Override
+  @Ignore
+  public void testSetIsEmpty() {}
+
+  @Override
+  @Ignore
+  public void testMergeSetIntoSource() {}
+
+  @Override
+  @Ignore
+  public void testMergeSetIntoNewNamespace() {}
+
+  @Override
+  @Ignore
+  public void testMap() {}
+
+  @Override
+  @Ignore
+  public void testSetReadable() {}
+
+  @Override
+  @Ignore
+  public void testMapReadable() {}
+
+}

Reply via email to