Repository: beam Updated Branches: refs/heads/master c528fb2f7 -> d5261d74b
[BEAM-2423] Port state internals tests to the new base class StateInternalsTest Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/581ee152 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/581ee152 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/581ee152 Branch: refs/heads/master Commit: 581ee1520e497fca95e8c4aa75f90050952523d0 Parents: c528fb2 Author: JingsongLi <lzljs3620...@aliyun.com> Authored: Tue Jun 13 11:26:38 2017 +0800 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Jun 16 11:48:48 2017 +0200 ---------------------------------------------------------------------- runners/apex/pom.xml | 7 + .../utils/ApexStateInternalsTest.java | 411 ++++--------------- .../core/InMemoryStateInternalsTest.java | 46 ++- .../beam/runners/core/StateInternalsTest.java | 14 +- .../FlinkBroadcastStateInternalsTest.java | 242 +++-------- .../FlinkKeyGroupStateInternalsTest.java | 359 ++++++++-------- .../streaming/FlinkSplitStateInternalsTest.java | 132 +++--- runners/spark/pom.xml | 7 + .../spark/stateful/SparkStateInternalsTest.java | 66 +++ 9 files changed, 521 insertions(+), 763 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/apex/pom.xml ---------------------------------------------------------------------- diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index 4a36bec..d3d4318 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -184,6 +184,13 @@ <type>test-jar</type> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-core-java</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java index a7e64af..87aa8c2 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java @@ -18,350 +18,109 @@ package org.apache.beam.runners.apex.translation.utils; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import com.datatorrent.lib.util.KryoCloneUtils; -import java.util.Arrays; -import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend; -import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory; -import org.apache.beam.runners.core.StateMerging; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateInternalsTest; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaceForTest; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.CombiningState; -import org.apache.beam.sdk.state.GroupingState; -import org.apache.beam.sdk.state.ReadableState; import org.apache.beam.sdk.state.ValueState; -import org.apache.beam.sdk.state.WatermarkHoldState; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.hamcrest.Matchers; -import org.joda.time.Instant; -import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.junit.runners.Suite; /** * Tests for {@link ApexStateInternals}. This is based on the tests for - * {@code InMemoryStateInternals}. + * {@code StateInternalsTest}. */ +@RunWith(Suite.class) +@Suite.SuiteClasses({ + ApexStateInternalsTest.StandardStateInternalsTests.class, + ApexStateInternalsTest.OtherTests.class +}) public class ApexStateInternalsTest { - private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10)); - private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); - private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); - private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = - StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<CombiningState<Integer, int[], Integer>> - SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( - "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); - private static final StateTag<BagState<String>> STRING_BAG_ADDR = - StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<WatermarkHoldState> - WATERMARK_EARLIEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); - private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); - private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); - - private ApexStateInternals<String> underTest; - - @Before - public void initStateInternals() { - underTest = new ApexStateInternals.ApexStateBackend() + private static StateInternals newStateInternals() { + return new ApexStateInternals.ApexStateBackend() .newStateInternalsFactory(StringUtf8Coder.of()) - .stateInternalsForKey((String) null); - } - - @Test - public void testBag() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - - assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))); - - assertThat(value.read(), Matchers.emptyIterable()); - value.add("hello"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello")); - - value.add("world"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world")); - - value.clear(); - assertThat(value.read(), Matchers.emptyIterable()); - assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value); - - } - - @Test - public void testBagIsEmpty() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add("hello"); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeBagIntoSource() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); - - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); - - StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1); - - // Reading the merged bag gets both the contents - assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); - assertThat(bag2.read(), Matchers.emptyIterable()); - } - - @Test - public void testMergeBagIntoNewNamespace() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); - BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR); - - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); - - StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3); - - // Reading the merged bag gets both the contents - assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); - assertThat(bag1.read(), Matchers.emptyIterable()); - assertThat(bag2.read(), Matchers.emptyIterable()); + .stateInternalsForKey("dummyKey"); + } + + /** + * A standard StateInternals test. Ignore set and map tests. + */ + @RunWith(JUnit4.class) + public static class StandardStateInternalsTests extends StateInternalsTest { + @Override + protected StateInternals createStateInternals() { + return newStateInternals(); + } + + @Override + @Ignore + public void testSet() {} + + @Override + @Ignore + public void testSetIsEmpty() {} + + @Override + @Ignore + public void testMergeSetIntoSource() {} + + @Override + @Ignore + public void testMergeSetIntoNewNamespace() {} + + @Override + @Ignore + public void testMap() {} + + @Override + @Ignore + public void testSetReadable() {} + + @Override + @Ignore + public void testMapReadable() {} + } + + /** + * A specific test of ApexStateInternalsTest. + */ + @RunWith(JUnit4.class) + public static class OtherTests { + + private static final StateNamespace NAMESPACE = new StateNamespaceForTest("ns"); + private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = + StateTags.value("stringValue", StringUtf8Coder.of()); + + @Test + public void testSerialization() throws Exception { + ApexStateInternals.ApexStateInternalsFactory<String> sif = + new ApexStateInternals.ApexStateBackend(). + newStateInternalsFactory(StringUtf8Coder.of()); + ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy"); + + ValueState<String> value = keyAndState.state(NAMESPACE, STRING_VALUE_ADDR); + assertEquals(keyAndState.state(NAMESPACE, STRING_VALUE_ADDR), value); + value.write("hello"); + + ApexStateInternals.ApexStateInternalsFactory<String> cloned; + assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif)); + ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy"); + + ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE, STRING_VALUE_ADDR); + assertThat(clonedValue.read(), Matchers.equalTo("hello")); + assertEquals(clonedKeyAndState.state(NAMESPACE, STRING_VALUE_ADDR), value); + } } - - @Test - public void testCombiningValue() throws Exception { - GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR))); - - assertThat(value.read(), Matchers.equalTo(0)); - value.add(2); - assertThat(value.read(), Matchers.equalTo(2)); - - value.add(3); - assertThat(value.read(), Matchers.equalTo(5)); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(0)); - assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value); - } - - @Test - public void testCombiningIsEmpty() throws Exception { - GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add(5); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeCombiningValueIntoSource() throws Exception { - CombiningState<Integer, int[], Integer> value1 = - underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value2 = - underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - - value1.add(5); - value2.add(10); - value1.add(6); - - assertThat(value1.read(), Matchers.equalTo(11)); - assertThat(value2.read(), Matchers.equalTo(10)); - - // Merging clears the old values and updates the result value. - StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1); - - assertThat(value1.read(), Matchers.equalTo(21)); - assertThat(value2.read(), Matchers.equalTo(0)); - } - - @Test - public void testMergeCombiningValueIntoNewNamespace() throws Exception { - CombiningState<Integer, int[], Integer> value1 = - underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value2 = - underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value3 = - underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); - - value1.add(5); - value2.add(10); - value1.add(6); - - StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3); - - // Merging clears the old values and updates the result value. - assertThat(value1.read(), Matchers.equalTo(0)); - assertThat(value2.read(), Matchers.equalTo(0)); - assertThat(value3.read(), Matchers.equalTo(21)); - } - - @Test - public void testWatermarkEarliestState() throws Exception { - WatermarkHoldState value = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.add(new Instant(3000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.add(new Instant(1000)); - assertThat(value.read(), Matchers.equalTo(new Instant(1000))); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(null)); - assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value); - } - - @Test - public void testWatermarkLatestState() throws Exception { - WatermarkHoldState value = - underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.add(new Instant(3000)); - assertThat(value.read(), Matchers.equalTo(new Instant(3000))); - - value.add(new Instant(1000)); - assertThat(value.read(), Matchers.equalTo(new Instant(3000))); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(null)); - assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value); - } - - @Test - public void testWatermarkEndOfWindowState() throws Exception { - WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(null)); - assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value); - } - - @Test - public void testWatermarkStateIsEmpty() throws Exception { - WatermarkHoldState value = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add(new Instant(1000)); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeEarliestWatermarkIntoSource() throws Exception { - WatermarkHoldState value1 = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - WatermarkHoldState value2 = - underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); - - value1.add(new Instant(3000)); - value2.add(new Instant(5000)); - value1.add(new Instant(4000)); - value2.add(new Instant(2000)); - - // Merging clears the old values and updates the merged value. - StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1); - - assertThat(value1.read(), Matchers.equalTo(new Instant(2000))); - assertThat(value2.read(), Matchers.equalTo(null)); - } - - @Test - public void testMergeLatestWatermarkIntoSource() throws Exception { - WatermarkHoldState value1 = - underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - WatermarkHoldState value2 = - underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); - WatermarkHoldState value3 = - underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); - - value1.add(new Instant(3000)); - value2.add(new Instant(5000)); - value1.add(new Instant(4000)); - value2.add(new Instant(2000)); - - // Merging clears the old values and updates the result value. - StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1); - - // Merging clears the old values and updates the result value. - assertThat(value3.read(), Matchers.equalTo(new Instant(5000))); - assertThat(value1.read(), Matchers.equalTo(null)); - assertThat(value2.read(), Matchers.equalTo(null)); - } - - @Test - public void testSerialization() throws Exception { - ApexStateInternalsFactory<String> sif = new ApexStateBackend(). - newStateInternalsFactory(StringUtf8Coder.of()); - ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy"); - - ValueState<String> value = keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR); - assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value); - value.write("hello"); - - ApexStateInternalsFactory<String> cloned; - assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif)); - ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy"); - - ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR); - assertThat(clonedValue.read(), Matchers.equalTo("hello")); - assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value); - } - } http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java index 335c2f8..1c6cd30 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java @@ -19,7 +19,17 @@ package org.apache.beam.runners.core; import static org.junit.Assert.assertThat; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.CombiningState; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.SetState; import org.apache.beam.sdk.state.State; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.state.WatermarkHoldState; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.hamcrest.Matchers; import org.junit.Test; import org.junit.runner.RunWith; @@ -53,21 +63,41 @@ public class InMemoryStateInternalsTest { @RunWith(JUnit4.class) public static class OtherTests { + private static final StateNamespace NAMESPACE = new StateNamespaceForTest("ns"); + + private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = + StateTags.value("stringValue", StringUtf8Coder.of()); + private static final StateTag<CombiningState<Integer, int[], Integer>> + SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( + "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); + private static final StateTag<BagState<String>> STRING_BAG_ADDR = + StateTags.bag("stringBag", StringUtf8Coder.of()); + private static final StateTag<SetState<String>> STRING_SET_ADDR = + StateTags.set("stringSet", StringUtf8Coder.of()); + private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR = + StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of()); + private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); + private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); + StateInternals underTest = new InMemoryStateInternals<>("dummyKey"); @Test public void testSameInstance() { - assertSameInstance(StateInternalsTest.STRING_VALUE_ADDR); - assertSameInstance(StateInternalsTest.SUM_INTEGER_ADDR); - assertSameInstance(StateInternalsTest.STRING_BAG_ADDR); - assertSameInstance(StateInternalsTest.STRING_SET_ADDR); - assertSameInstance(StateInternalsTest.STRING_MAP_ADDR); - assertSameInstance(StateInternalsTest.WATERMARK_EARLIEST_ADDR); + assertSameInstance(STRING_VALUE_ADDR); + assertSameInstance(SUM_INTEGER_ADDR); + assertSameInstance(STRING_BAG_ADDR); + assertSameInstance(STRING_SET_ADDR); + assertSameInstance(STRING_MAP_ADDR); + assertSameInstance(WATERMARK_EARLIEST_ADDR); } private <T extends State> void assertSameInstance(StateTag<T> address) { - assertThat(underTest.state(StateInternalsTest.NAMESPACE_1, address), - Matchers.sameInstance(underTest.state(StateInternalsTest.NAMESPACE_1, address))); + assertThat(underTest.state(NAMESPACE, address), + Matchers.sameInstance(underTest.state(NAMESPACE, address))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java index 6011fb4..ae07fe6 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java @@ -56,22 +56,22 @@ import org.junit.Test; public abstract class StateInternalsTest { private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10)); - static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); + private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - static final StateTag<ValueState<String>> STRING_VALUE_ADDR = + private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = StateTags.value("stringValue", StringUtf8Coder.of()); - static final StateTag<CombiningState<Integer, int[], Integer>> + private static final StateTag<CombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); - static final StateTag<BagState<String>> STRING_BAG_ADDR = + private static final StateTag<BagState<String>> STRING_BAG_ADDR = StateTags.bag("stringBag", StringUtf8Coder.of()); - static final StateTag<SetState<String>> STRING_SET_ADDR = + private static final StateTag<SetState<String>> STRING_SET_ADDR = StateTags.set("stringSet", StringUtf8Coder.of()); - static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR = + private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR = StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of()); - static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR = + private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR = StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR = StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java index 2b96d91..3409d27 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java @@ -17,229 +17,87 @@ */ package org.apache.beam.runners.flink.streaming; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertThat; - -import java.util.Arrays; -import org.apache.beam.runners.core.StateMerging; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateNamespaceForTest; -import org.apache.beam.runners.core.StateTag; -import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateInternalsTest; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.CombiningState; -import org.apache.beam.sdk.state.GroupingState; -import org.apache.beam.sdk.state.ReadableState; -import org.apache.beam.sdk.state.ValueState; -import org.apache.beam.sdk.transforms.Sum; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.hamcrest.Matchers; -import org.junit.Before; -import org.junit.Test; +import org.junit.Ignore; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** * Tests for {@link FlinkBroadcastStateInternals}. This is based on the tests for - * {@code InMemoryStateInternals}. + * {@code StateInternalsTest}. + * + * <p>Just test value, bag and combining. */ @RunWith(JUnit4.class) -public class FlinkBroadcastStateInternalsTest { - private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); - private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); - private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - - private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = - StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<CombiningState<Integer, int[], Integer>> - SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( - "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); - private static final StateTag<BagState<String>> STRING_BAG_ADDR = - StateTags.bag("stringBag", StringUtf8Coder.of()); - - FlinkBroadcastStateInternals<String> underTest; - - @Before - public void initStateInternals() { +public class FlinkBroadcastStateInternalsTest extends StateInternalsTest { + + @Override + protected StateInternals createStateInternals() { MemoryStateBackend backend = new MemoryStateBackend(); try { OperatorStateBackend operatorStateBackend = backend.createOperatorStateBackend(new DummyEnvironment("test", 1, 0), ""); - underTest = new FlinkBroadcastStateInternals<>(1, operatorStateBackend); - + return new FlinkBroadcastStateInternals<>(1, operatorStateBackend); } catch (Exception e) { throw new RuntimeException(e); } } - @Test - public void testValue() throws Exception { - ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR); - - assertEquals(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), value); - assertNotEquals( - underTest.state(NAMESPACE_2, STRING_VALUE_ADDR), - value); - - assertThat(value.read(), Matchers.nullValue()); - value.write("hello"); - assertThat(value.read(), Matchers.equalTo("hello")); - value.write("world"); - assertThat(value.read(), Matchers.equalTo("world")); - - value.clear(); - assertThat(value.read(), Matchers.nullValue()); - assertEquals(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), value); - - } - - @Test - public void testBag() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + @Override + @Ignore + public void testSet() {} - assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))); + @Override + @Ignore + public void testSetIsEmpty() {} - assertThat(value.read(), Matchers.emptyIterable()); - value.add("hello"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello")); + @Override + @Ignore + public void testMergeSetIntoSource() {} - value.add("world"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world")); + @Override + @Ignore + public void testMergeSetIntoNewNamespace() {} - value.clear(); - assertThat(value.read(), Matchers.emptyIterable()); - assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value); + @Override + @Ignore + public void testMap() {} - } - - @Test - public void testBagIsEmpty() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + @Override + @Ignore + public void testWatermarkEarliestState() {} - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add("hello"); - assertThat(readFuture.read(), Matchers.is(false)); + @Override + @Ignore + public void testWatermarkLatestState() {} - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } + @Override + @Ignore + public void testWatermarkEndOfWindowState() {} - @Test - public void testMergeBagIntoSource() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); + @Override + @Ignore + public void testWatermarkStateIsEmpty() {} - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); + @Override + @Ignore + public void testMergeEarliestWatermarkIntoSource() {} - StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1); - - // Reading the merged bag gets both the contents - assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); - assertThat(bag2.read(), Matchers.emptyIterable()); - } + @Override + @Ignore + public void testMergeLatestWatermarkIntoSource() {} - @Test - public void testMergeBagIntoNewNamespace() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); - BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR); - - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); - - StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3); - - // Reading the merged bag gets both the contents - assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); - assertThat(bag1.read(), Matchers.emptyIterable()); - assertThat(bag2.read(), Matchers.emptyIterable()); - } + @Override + @Ignore + public void testSetReadable() {} - @Test - public void testCombiningValue() throws Exception { - GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR))); - - assertThat(value.read(), Matchers.equalTo(0)); - value.add(2); - assertThat(value.read(), Matchers.equalTo(2)); - - value.add(3); - assertThat(value.read(), Matchers.equalTo(5)); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(0)); - assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value); - } - - @Test - public void testCombiningIsEmpty() throws Exception { - GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add(5); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeCombiningValueIntoSource() throws Exception { - CombiningState<Integer, int[], Integer> value1 = - underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value2 = - underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - - value1.add(5); - value2.add(10); - value1.add(6); - - assertThat(value1.read(), Matchers.equalTo(11)); - assertThat(value2.read(), Matchers.equalTo(10)); - - // Merging clears the old values and updates the result value. - StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1); - - assertThat(value1.read(), Matchers.equalTo(21)); - assertThat(value2.read(), Matchers.equalTo(0)); - } - - @Test - public void testMergeCombiningValueIntoNewNamespace() throws Exception { - CombiningState<Integer, int[], Integer> value1 = - underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value2 = - underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value3 = - underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); - - value1.add(5); - value2.add(10); - value1.add(6); - - StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3); - - // Merging clears the old values and updates the result value. - assertThat(value1.read(), Matchers.equalTo(0)); - assertThat(value2.read(), Matchers.equalTo(0)); - assertThat(value3.read(), Matchers.equalTo(21)); - } + @Override + @Ignore + public void testMapReadable() {} } http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java index 4012373..aed14f3 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.flink.streaming; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import java.io.ByteArrayInputStream; @@ -26,8 +24,8 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.nio.ByteBuffer; -import java.util.Arrays; -import org.apache.beam.runners.core.StateMerging; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateInternalsTest; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaceForTest; import org.apache.beam.runners.core.StateTag; @@ -35,7 +33,6 @@ import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.ReadableState; import org.apache.beam.sdk.util.CoderUtils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; @@ -47,215 +44,219 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.streaming.api.operators.KeyContext; import org.hamcrest.Matchers; -import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.Suite; /** * Tests for {@link FlinkKeyGroupStateInternals}. This is based on the tests for - * {@code InMemoryStateInternals}. + * {@code StateInternalsTest}. */ -@RunWith(JUnit4.class) +@RunWith(Suite.class) +@Suite.SuiteClasses({ + FlinkKeyGroupStateInternalsTest.StandardStateInternalsTests.class, + FlinkKeyGroupStateInternalsTest.OtherTests.class +}) public class FlinkKeyGroupStateInternalsTest { - private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); - private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); - private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - private static final StateTag<BagState<String>> STRING_BAG_ADDR = - StateTags.bag("stringBag", StringUtf8Coder.of()); - - FlinkKeyGroupStateInternals<String> underTest; - private KeyedStateBackend keyedStateBackend; - - @Before - public void initStateInternals() { - try { - keyedStateBackend = getKeyedStateBackend(2, new KeyGroupRange(0, 1)); - underTest = new FlinkKeyGroupStateInternals<>(StringUtf8Coder.of(), keyedStateBackend); - } catch (Exception e) { - throw new RuntimeException(e); + /** + * A standard StateInternals test. Just test BagState. + */ + @RunWith(JUnit4.class) + public static class StandardStateInternalsTests extends StateInternalsTest { + @Override + protected StateInternals createStateInternals() { + KeyedStateBackend keyedStateBackend = + getKeyedStateBackend(2, new KeyGroupRange(0, 1)); + return new FlinkKeyGroupStateInternals<>(StringUtf8Coder.of(), keyedStateBackend); } - } - private KeyedStateBackend getKeyedStateBackend(int numberOfKeyGroups, - KeyGroupRange keyGroupRange) { - MemoryStateBackend backend = new MemoryStateBackend(); - try { - AbstractKeyedStateBackend<ByteBuffer> keyedStateBackend = backend.createKeyedStateBackend( - new DummyEnvironment("test", 1, 0), - new JobID(), - "test_op", - new GenericTypeInfo<>(ByteBuffer.class).createSerializer(new ExecutionConfig()), - numberOfKeyGroups, - keyGroupRange, - new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); - keyedStateBackend.setCurrentKey(ByteBuffer.wrap( - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "1"))); - return keyedStateBackend; - } catch (Exception e) { - throw new RuntimeException(e); - } - } + @Override + @Ignore + public void testValue() {} - @Test - public void testBag() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + @Override + @Ignore + public void testSet() {} - assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))); + @Override + @Ignore + public void testSetIsEmpty() {} - assertThat(value.read(), Matchers.emptyIterable()); - value.add("hello"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello")); + @Override + @Ignore + public void testMergeSetIntoSource() {} - value.add("world"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world")); + @Override + @Ignore + public void testMergeSetIntoNewNamespace() {} - value.clear(); - assertThat(value.read(), Matchers.emptyIterable()); - assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value); + @Override + @Ignore + public void testMap() {} - } + @Override + @Ignore + public void testCombiningValue() {} - @Test - public void testBagIsEmpty() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + @Override + @Ignore + public void testCombiningIsEmpty() {} - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add("hello"); - assertThat(readFuture.read(), Matchers.is(false)); + @Override + @Ignore + public void testMergeCombiningValueIntoSource() {} - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } + @Override + @Ignore + public void testMergeCombiningValueIntoNewNamespace() {} - @Test - public void testMergeBagIntoSource() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); + @Override + @Ignore + public void testWatermarkEarliestState() {} - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); + @Override + @Ignore + public void testWatermarkLatestState() {} - StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1); + @Override + @Ignore + public void testWatermarkEndOfWindowState() {} - // Reading the merged bag gets both the contents - assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); - assertThat(bag2.read(), Matchers.emptyIterable()); - } + @Override + @Ignore + public void testWatermarkStateIsEmpty() {} - @Test - public void testMergeBagIntoNewNamespace() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); - BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR); + @Override + @Ignore + public void testMergeEarliestWatermarkIntoSource() {} - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); + @Override + @Ignore + public void testMergeLatestWatermarkIntoSource() {} - StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3); + @Override + @Ignore + public void testSetReadable() {} - // Reading the merged bag gets both the contents - assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); - assertThat(bag1.read(), Matchers.emptyIterable()); - assertThat(bag2.read(), Matchers.emptyIterable()); + @Override + @Ignore + public void testMapReadable() {} } - @Test - public void testKeyGroupAndCheckpoint() throws Exception { - // assign to keyGroup 0 - ByteBuffer key0 = ByteBuffer.wrap( - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "11111111")); - // assign to keyGroup 1 - ByteBuffer key1 = ByteBuffer.wrap( - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "22222222")); - FlinkKeyGroupStateInternals<String> allState; - { - KeyedStateBackend keyedStateBackend = getKeyedStateBackend(2, new KeyGroupRange(0, 1)); - allState = new FlinkKeyGroupStateInternals<>( - StringUtf8Coder.of(), keyedStateBackend); - BagState<String> valueForNamespace0 = allState.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> valueForNamespace1 = allState.state(NAMESPACE_2, STRING_BAG_ADDR); - keyedStateBackend.setCurrentKey(key0); - valueForNamespace0.add("0"); - valueForNamespace1.add("2"); - keyedStateBackend.setCurrentKey(key1); - valueForNamespace0.add("1"); - valueForNamespace1.add("3"); - assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("0", "1")); - assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("2", "3")); - } - - ClassLoader classLoader = FlinkKeyGroupStateInternalsTest.class.getClassLoader(); - - // 1. scale up - ByteArrayOutputStream out0 = new ByteArrayOutputStream(); - allState.snapshotKeyGroupState(0, new DataOutputStream(out0)); - DataInputStream in0 = new DataInputStream( - new ByteArrayInputStream(out0.toByteArray())); - { - KeyedStateBackend keyedStateBackend = getKeyedStateBackend(2, new KeyGroupRange(0, 0)); - FlinkKeyGroupStateInternals<String> state0 = - new FlinkKeyGroupStateInternals<>( - StringUtf8Coder.of(), keyedStateBackend); - state0.restoreKeyGroupState(0, in0, classLoader); - BagState<String> valueForNamespace0 = state0.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> valueForNamespace1 = state0.state(NAMESPACE_2, STRING_BAG_ADDR); - assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("0")); - assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("2")); - } - - ByteArrayOutputStream out1 = new ByteArrayOutputStream(); - allState.snapshotKeyGroupState(1, new DataOutputStream(out1)); - DataInputStream in1 = new DataInputStream( - new ByteArrayInputStream(out1.toByteArray())); - { - KeyedStateBackend keyedStateBackend = getKeyedStateBackend(2, new KeyGroupRange(1, 1)); - FlinkKeyGroupStateInternals<String> state1 = - new FlinkKeyGroupStateInternals<>( - StringUtf8Coder.of(), keyedStateBackend); - state1.restoreKeyGroupState(1, in1, classLoader); - BagState<String> valueForNamespace0 = state1.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> valueForNamespace1 = state1.state(NAMESPACE_2, STRING_BAG_ADDR); - assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("1")); - assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("3")); - } - - // 2. scale down - { - KeyedStateBackend keyedStateBackend = getKeyedStateBackend(2, new KeyGroupRange(0, 1)); - FlinkKeyGroupStateInternals<String> newAllState = new FlinkKeyGroupStateInternals<>( - StringUtf8Coder.of(), keyedStateBackend); - in0.reset(); - in1.reset(); - newAllState.restoreKeyGroupState(0, in0, classLoader); - newAllState.restoreKeyGroupState(1, in1, classLoader); - BagState<String> valueForNamespace0 = newAllState.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> valueForNamespace1 = newAllState.state(NAMESPACE_2, STRING_BAG_ADDR); - assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("0", "1")); - assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("2", "3")); + /** + * A specific test of FlinkKeyGroupStateInternalsTest. + */ + @RunWith(JUnit4.class) + public static class OtherTests { + + private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); + private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); + private static final StateTag<BagState<String>> STRING_BAG_ADDR = + StateTags.bag("stringBag", StringUtf8Coder.of()); + + @Test + public void testKeyGroupAndCheckpoint() throws Exception { + // assign to keyGroup 0 + ByteBuffer key0 = ByteBuffer.wrap( + CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "11111111")); + // assign to keyGroup 1 + ByteBuffer key1 = ByteBuffer.wrap( + CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "22222222")); + FlinkKeyGroupStateInternals<String> allState; + { + KeyedStateBackend<ByteBuffer> keyedStateBackend = + getKeyedStateBackend(2, new KeyGroupRange(0, 1)); + allState = new FlinkKeyGroupStateInternals<>( + StringUtf8Coder.of(), keyedStateBackend); + BagState<String> valueForNamespace0 = allState.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> valueForNamespace1 = allState.state(NAMESPACE_2, STRING_BAG_ADDR); + keyedStateBackend.setCurrentKey(key0); + valueForNamespace0.add("0"); + valueForNamespace1.add("2"); + keyedStateBackend.setCurrentKey(key1); + valueForNamespace0.add("1"); + valueForNamespace1.add("3"); + assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("0", "1")); + assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("2", "3")); + } + + ClassLoader classLoader = FlinkKeyGroupStateInternalsTest.class.getClassLoader(); + + // 1. scale up + ByteArrayOutputStream out0 = new ByteArrayOutputStream(); + allState.snapshotKeyGroupState(0, new DataOutputStream(out0)); + DataInputStream in0 = new DataInputStream( + new ByteArrayInputStream(out0.toByteArray())); + { + KeyedStateBackend<ByteBuffer> keyedStateBackend = + getKeyedStateBackend(2, new KeyGroupRange(0, 0)); + FlinkKeyGroupStateInternals<String> state0 = + new FlinkKeyGroupStateInternals<>( + StringUtf8Coder.of(), keyedStateBackend); + state0.restoreKeyGroupState(0, in0, classLoader); + BagState<String> valueForNamespace0 = state0.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> valueForNamespace1 = state0.state(NAMESPACE_2, STRING_BAG_ADDR); + assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("0")); + assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("2")); + } + + ByteArrayOutputStream out1 = new ByteArrayOutputStream(); + allState.snapshotKeyGroupState(1, new DataOutputStream(out1)); + DataInputStream in1 = new DataInputStream( + new ByteArrayInputStream(out1.toByteArray())); + { + KeyedStateBackend<ByteBuffer> keyedStateBackend = + getKeyedStateBackend(2, new KeyGroupRange(1, 1)); + FlinkKeyGroupStateInternals<String> state1 = + new FlinkKeyGroupStateInternals<>( + StringUtf8Coder.of(), keyedStateBackend); + state1.restoreKeyGroupState(1, in1, classLoader); + BagState<String> valueForNamespace0 = state1.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> valueForNamespace1 = state1.state(NAMESPACE_2, STRING_BAG_ADDR); + assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("1")); + assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("3")); + } + + // 2. scale down + { + KeyedStateBackend<ByteBuffer> keyedStateBackend = + getKeyedStateBackend(2, new KeyGroupRange(0, 1)); + FlinkKeyGroupStateInternals<String> newAllState = new FlinkKeyGroupStateInternals<>( + StringUtf8Coder.of(), keyedStateBackend); + in0.reset(); + in1.reset(); + newAllState.restoreKeyGroupState(0, in0, classLoader); + newAllState.restoreKeyGroupState(1, in1, classLoader); + BagState<String> valueForNamespace0 = newAllState.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> valueForNamespace1 = newAllState.state(NAMESPACE_2, STRING_BAG_ADDR); + assertThat(valueForNamespace0.read(), Matchers.containsInAnyOrder("0", "1")); + assertThat(valueForNamespace1.read(), Matchers.containsInAnyOrder("2", "3")); + } } } - private static class TestKeyContext implements KeyContext { - - private Object key; - - @Override - public void setCurrentKey(Object key) { - this.key = key; - } - - @Override - public Object getCurrentKey() { - return key; + private static KeyedStateBackend<ByteBuffer> getKeyedStateBackend(int numberOfKeyGroups, + KeyGroupRange keyGroupRange) { + MemoryStateBackend backend = new MemoryStateBackend(); + try { + AbstractKeyedStateBackend<ByteBuffer> keyedStateBackend = backend.createKeyedStateBackend( + new DummyEnvironment("test", 1, 0), + new JobID(), + "test_op", + new GenericTypeInfo<>(ByteBuffer.class).createSerializer(new ExecutionConfig()), + numberOfKeyGroups, + keyGroupRange, + new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); + keyedStateBackend.setCurrentKey(ByteBuffer.wrap( + CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "1"))); + return keyedStateBackend; + } catch (Exception e) { + throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java index 17cd3f5..667b5ba 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java @@ -17,85 +17,115 @@ */ package org.apache.beam.runners.flink.streaming; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; - -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateNamespaceForTest; -import org.apache.beam.runners.core.StateTag; -import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateInternalsTest; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.ReadableState; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.hamcrest.Matchers; -import org.junit.Before; -import org.junit.Test; +import org.junit.Ignore; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** * Tests for {@link FlinkSplitStateInternals}. This is based on the tests for - * {@code InMemoryStateInternals}. + * {@code StateInternalsTest}. + * + * <p>Just test testBag and testBagIsEmpty. */ @RunWith(JUnit4.class) -public class FlinkSplitStateInternalsTest { - private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); - private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); - - private static final StateTag<BagState<String>> STRING_BAG_ADDR = - StateTags.bag("stringBag", StringUtf8Coder.of()); - - FlinkSplitStateInternals<String> underTest; +public class FlinkSplitStateInternalsTest extends StateInternalsTest { - @Before - public void initStateInternals() { + @Override + protected StateInternals createStateInternals() { MemoryStateBackend backend = new MemoryStateBackend(); try { OperatorStateBackend operatorStateBackend = backend.createOperatorStateBackend(new DummyEnvironment("test", 1, 0), ""); - underTest = new FlinkSplitStateInternals<>(operatorStateBackend); - + return new FlinkSplitStateInternals<>(operatorStateBackend); } catch (Exception e) { throw new RuntimeException(e); } } - @Test - public void testBag() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + @Override + @Ignore + public void testMergeBagIntoSource() {} - assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))); + @Override + @Ignore + public void testMergeBagIntoNewNamespace() {} - assertThat(value.read(), Matchers.emptyIterable()); - value.add("hello"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello")); + @Override + @Ignore + public void testValue() {} - value.add("world"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world")); + @Override + @Ignore + public void testSet() {} - value.clear(); - assertThat(value.read(), Matchers.emptyIterable()); - assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value); + @Override + @Ignore + public void testSetIsEmpty() {} - } + @Override + @Ignore + public void testMergeSetIntoSource() {} - @Test - public void testBagIsEmpty() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + @Override + @Ignore + public void testMergeSetIntoNewNamespace() {} - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add("hello"); - assertThat(readFuture.read(), Matchers.is(false)); + @Override + @Ignore + public void testMap() {} - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } + @Override + @Ignore + public void testCombiningValue() {} + + @Override + @Ignore + public void testCombiningIsEmpty() {} + + @Override + @Ignore + public void testMergeCombiningValueIntoSource() {} + + @Override + @Ignore + public void testMergeCombiningValueIntoNewNamespace() {} + + @Override + @Ignore + public void testWatermarkEarliestState() {} + + @Override + @Ignore + public void testWatermarkLatestState() {} + + @Override + @Ignore + public void testWatermarkEndOfWindowState() {} + + @Override + @Ignore + public void testWatermarkStateIsEmpty() {} + + @Override + @Ignore + public void testMergeEarliestWatermarkIntoSource() {} + + @Override + @Ignore + public void testMergeLatestWatermarkIntoSource() {} + + @Override + @Ignore + public void testSetReadable() {} + + @Override + @Ignore + public void testMapReadable() {} } http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index ddb4aca..d1dba32 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -321,6 +321,13 @@ <type>test-jar</type> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-core-java</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/beam/blob/581ee152/runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java new file mode 100644 index 0000000..b4597f9 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.stateful; + +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateInternalsTest; +import org.junit.Ignore; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link SparkStateInternals}. This is based on {@link StateInternalsTest}. + * Ignore set and map tests. + */ +@RunWith(JUnit4.class) +public class SparkStateInternalsTest extends StateInternalsTest { + + @Override + protected StateInternals createStateInternals() { + return SparkStateInternals.forKey("dummyKey"); + } + + @Override + @Ignore + public void testSet() {} + + @Override + @Ignore + public void testSetIsEmpty() {} + + @Override + @Ignore + public void testMergeSetIntoSource() {} + + @Override + @Ignore + public void testMergeSetIntoNewNamespace() {} + + @Override + @Ignore + public void testMap() {} + + @Override + @Ignore + public void testSetReadable() {} + + @Override + @Ignore + public void testMapReadable() {} + +}