Repository: beam Updated Branches: refs/heads/master 1597f3ca6 -> f9d51aa5c
[BEAM-2423] Abstract StateInternalsTest for the different state internals Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8362bdb9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8362bdb9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8362bdb9 Branch: refs/heads/master Commit: 8362bdb9cd35cc02ed179b3a64fd72f1264a99be Parents: 1597f3c Author: JingsongLi <lzljs3620...@aliyun.com> Authored: Thu Jun 8 01:31:34 2017 +0800 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Mon Jun 12 11:51:29 2017 +0200 ---------------------------------------------------------------------- pom.xml | 7 + .../core/InMemoryStateInternalsTest.java | 555 ++---------------- .../beam/runners/core/StateInternalsTest.java | 573 +++++++++++++++++++ runners/flink/pom.xml | 8 + .../streaming/FlinkStateInternalsTest.java | 348 +---------- 5 files changed, 641 insertions(+), 850 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8362bdb9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 805a8d6..9373a40 100644 --- a/pom.xml +++ b/pom.xml @@ -511,6 +511,13 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-core-java</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/8362bdb9/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java index b526305..335c2f8 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java @@ -17,545 +17,58 @@ */ package org.apache.beam.runners.core; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItems; -import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import java.util.Arrays; -import java.util.Map; -import java.util.Objects; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.CombiningState; -import org.apache.beam.sdk.state.GroupingState; -import org.apache.beam.sdk.state.MapState; -import org.apache.beam.sdk.state.ReadableState; -import org.apache.beam.sdk.state.SetState; -import org.apache.beam.sdk.state.ValueState; -import org.apache.beam.sdk.state.WatermarkHoldState; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.state.State; import org.hamcrest.Matchers; -import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.Suite; /** - * Tests for {@link InMemoryStateInternals}. + * Tests for {@link InMemoryStateInternals}. This is based on {@link StateInternalsTest}. */ -@RunWith(JUnit4.class) +@RunWith(Suite.class) +@Suite.SuiteClasses({ + InMemoryStateInternalsTest.StandardStateInternalsTests.class, + InMemoryStateInternalsTest.OtherTests.class +}) public class InMemoryStateInternalsTest { - private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10)); - private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); - private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); - private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = - StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<CombiningState<Integer, int[], Integer>> - SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( - "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); - private static final StateTag<BagState<String>> STRING_BAG_ADDR = - StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<SetState<String>> STRING_SET_ADDR = - StateTags.set("stringSet", StringUtf8Coder.of()); - private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR = - StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of()); - private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); - private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); - private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); - - InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey"); - - @Test - public void testValue() throws Exception { - ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR); - - // State instances are cached, but depend on the namespace. - assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), Matchers.sameInstance(value)); - assertThat( - underTest.state(NAMESPACE_2, STRING_VALUE_ADDR), - Matchers.not(Matchers.sameInstance(value))); - - assertThat(value.read(), Matchers.nullValue()); - value.write("hello"); - assertThat(value.read(), equalTo("hello")); - value.write("world"); - assertThat(value.read(), equalTo("world")); - - value.clear(); - assertThat(value.read(), Matchers.nullValue()); - assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), Matchers.sameInstance(value)); - } - - @Test - public void testBag() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - - // State instances are cached, but depend on the namespace. - assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_BAG_ADDR))); - assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)))); - - assertThat(value.read(), Matchers.emptyIterable()); - value.add("hello"); - assertThat(value.read(), containsInAnyOrder("hello")); - - value.add("world"); - assertThat(value.read(), containsInAnyOrder("hello", "world")); - - value.clear(); - assertThat(value.read(), Matchers.emptyIterable()); - assertThat(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), Matchers.sameInstance(value)); - } - - @Test - public void testBagIsEmpty() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add("hello"); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeBagIntoSource() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); - - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); - - StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1); - - // Reading the merged bag gets both the contents - assertThat(bag1.read(), containsInAnyOrder("Hello", "World", "!")); - assertThat(bag2.read(), Matchers.emptyIterable()); - } - - @Test - public void testMergeBagIntoNewNamespace() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); - BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR); - - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); - - StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3); - - // Reading the merged bag gets both the contents - assertThat(bag3.read(), containsInAnyOrder("Hello", "World", "!")); - assertThat(bag1.read(), Matchers.emptyIterable()); - assertThat(bag2.read(), Matchers.emptyIterable()); - } - - @Test - public void testSet() throws Exception { - SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR); - - // State instances are cached, but depend on the namespace. - assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_SET_ADDR))); - assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_SET_ADDR)))); - - // empty - assertThat(value.read(), Matchers.emptyIterable()); - assertFalse(value.contains("A").read()); - - // add - value.add("A"); - value.add("B"); - value.add("A"); - assertFalse(value.addIfAbsent("B").read()); - assertThat(value.read(), containsInAnyOrder("A", "B")); - - // remove - value.remove("A"); - assertThat(value.read(), containsInAnyOrder("B")); - value.remove("C"); - assertThat(value.read(), containsInAnyOrder("B")); - - // contains - assertFalse(value.contains("A").read()); - assertTrue(value.contains("B").read()); - value.add("C"); - value.add("D"); - - // readLater - assertThat(value.readLater().read(), containsInAnyOrder("B", "C", "D")); - SetState<String> later = value.readLater(); - assertThat(later.read(), hasItems("C", "D")); - assertFalse(later.contains("A").read()); - - // clear - value.clear(); - assertThat(value.read(), Matchers.emptyIterable()); - assertThat(underTest.state(NAMESPACE_1, STRING_SET_ADDR), Matchers.sameInstance(value)); - - } - - @Test - public void testSetIsEmpty() throws Exception { - SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add("hello"); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeSetIntoSource() throws Exception { - SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR); - SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR); - - set1.add("Hello"); - set2.add("Hello"); - set2.add("World"); - set1.add("!"); - - StateMerging.mergeSets(Arrays.asList(set1, set2), set1); - - // Reading the merged set gets both the contents - assertThat(set1.read(), containsInAnyOrder("Hello", "World", "!")); - assertThat(set2.read(), Matchers.emptyIterable()); - } - - @Test - public void testMergeSetIntoNewNamespace() throws Exception { - SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR); - SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR); - SetState<String> set3 = underTest.state(NAMESPACE_3, STRING_SET_ADDR); - - set1.add("Hello"); - set2.add("Hello"); - set2.add("World"); - set1.add("!"); - - StateMerging.mergeSets(Arrays.asList(set1, set2, set3), set3); - - // Reading the merged set gets both the contents - assertThat(set3.read(), containsInAnyOrder("Hello", "World", "!")); - assertThat(set1.read(), Matchers.emptyIterable()); - assertThat(set2.read(), Matchers.emptyIterable()); - } - - // for testMap - private static class MapEntry<K, V> implements Map.Entry<K, V> { - private K key; - private V value; - - private MapEntry(K key, V value) { - this.key = key; - this.value = value; - } - - static <K, V> Map.Entry<K, V> of(K k, V v) { - return new MapEntry<>(k, v); + /** + * A standard StateInternals test. + */ + @RunWith(JUnit4.class) + public static class StandardStateInternalsTests extends StateInternalsTest { + @Override + protected StateInternals createStateInternals() { + return new InMemoryStateInternals<>("dummyKey"); } + } - public final K getKey() { - return key; - } - public final V getValue() { - return value; - } + /** + * A specific test of InMemoryStateInternals. + */ + @RunWith(JUnit4.class) + public static class OtherTests { - public final String toString() { - return key + "=" + value; - } + StateInternals underTest = new InMemoryStateInternals<>("dummyKey"); - public final int hashCode() { - return Objects.hashCode(key) ^ Objects.hashCode(value); + @Test + public void testSameInstance() { + assertSameInstance(StateInternalsTest.STRING_VALUE_ADDR); + assertSameInstance(StateInternalsTest.SUM_INTEGER_ADDR); + assertSameInstance(StateInternalsTest.STRING_BAG_ADDR); + assertSameInstance(StateInternalsTest.STRING_SET_ADDR); + assertSameInstance(StateInternalsTest.STRING_MAP_ADDR); + assertSameInstance(StateInternalsTest.WATERMARK_EARLIEST_ADDR); } - public final V setValue(V newValue) { - V oldValue = value; - value = newValue; - return oldValue; + private <T extends State> void assertSameInstance(StateTag<T> address) { + assertThat(underTest.state(StateInternalsTest.NAMESPACE_1, address), + Matchers.sameInstance(underTest.state(StateInternalsTest.NAMESPACE_1, address))); } - - public final boolean equals(Object o) { - if (o == this) { - return true; - } - if (o instanceof Map.Entry) { - Map.Entry<?, ?> e = (Map.Entry<?, ?>) o; - if (Objects.equals(key, e.getKey()) - && Objects.equals(value, e.getValue())) { - return true; - } - } - return false; - } - } - - @Test - public void testMap() throws Exception { - MapState<String, Integer> value = underTest.state(NAMESPACE_1, STRING_MAP_ADDR); - - // State instances are cached, but depend on the namespace. - assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_MAP_ADDR))); - assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_MAP_ADDR)))); - - // put - assertThat(value.entries().read(), Matchers.emptyIterable()); - value.put("A", 1); - value.put("B", 2); - value.put("A", 11); - assertThat(value.putIfAbsent("B", 22).read(), equalTo(2)); - assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("A", 11), - MapEntry.of("B", 2))); - - // remove - value.remove("A"); - assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2))); - value.remove("C"); - assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2))); - - // get - assertNull(value.get("A").read()); - assertThat(value.get("B").read(), equalTo(2)); - value.put("C", 3); - value.put("D", 4); - assertThat(value.get("C").read(), equalTo(3)); - - // iterate - value.put("E", 5); - value.remove("C"); - assertThat(value.keys().read(), containsInAnyOrder("B", "D", "E")); - assertThat(value.values().read(), containsInAnyOrder(2, 4, 5)); - assertThat( - value.entries().read(), - containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5))); - - // readLater - assertThat(value.get("B").readLater().read(), equalTo(2)); - assertNull(value.get("A").readLater().read()); - assertThat( - value.entries().readLater().read(), - containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5))); - - // clear - value.clear(); - assertThat(value.entries().read(), Matchers.emptyIterable()); - assertThat(underTest.state(NAMESPACE_1, STRING_MAP_ADDR), Matchers.sameInstance(value)); } - @Test - public void testCombiningValue() throws Exception { - GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR))); - - assertThat(value.read(), equalTo(0)); - value.add(2); - assertThat(value.read(), equalTo(2)); - - value.add(3); - assertThat(value.read(), equalTo(5)); - - value.clear(); - assertThat(value.read(), equalTo(0)); - assertThat(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), Matchers.sameInstance(value)); - } - - @Test - public void testCombiningIsEmpty() throws Exception { - GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add(5); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeCombiningValueIntoSource() throws Exception { - CombiningState<Integer, int[], Integer> value1 = - underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value2 = - underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - - value1.add(5); - value2.add(10); - value1.add(6); - - assertThat(value1.read(), equalTo(11)); - assertThat(value2.read(), equalTo(10)); - - // Merging clears the old values and updates the result value. - StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1); - - assertThat(value1.read(), equalTo(21)); - assertThat(value2.read(), equalTo(0)); - } - - @Test - public void testMergeCombiningValueIntoNewNamespace() throws Exception { - CombiningState<Integer, int[], Integer> value1 = - underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value2 = - underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value3 = - underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); - - value1.add(5); - value2.add(10); - value1.add(6); - - StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3); - - // Merging clears the old values and updates the result value. - assertThat(value1.read(), equalTo(0)); - assertThat(value2.read(), equalTo(0)); - assertThat(value3.read(), equalTo(21)); - } - - @Test - public void testWatermarkEarliestState() throws Exception { - WatermarkHoldState value = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), equalTo(new Instant(2000))); - - value.add(new Instant(3000)); - assertThat(value.read(), equalTo(new Instant(2000))); - - value.add(new Instant(1000)); - assertThat(value.read(), equalTo(new Instant(1000))); - - value.clear(); - assertThat(value.read(), equalTo(null)); - assertThat(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), Matchers.sameInstance(value)); - } - - @Test - public void testWatermarkLatestState() throws Exception { - WatermarkHoldState value = - underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), equalTo(new Instant(2000))); - - value.add(new Instant(3000)); - assertThat(value.read(), equalTo(new Instant(3000))); - - value.add(new Instant(1000)); - assertThat(value.read(), equalTo(new Instant(3000))); - - value.clear(); - assertThat(value.read(), equalTo(null)); - assertThat(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), Matchers.sameInstance(value)); - } - - @Test - public void testWatermarkEndOfWindowState() throws Exception { - WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), equalTo(new Instant(2000))); - - value.clear(); - assertThat(value.read(), equalTo(null)); - assertThat(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), Matchers.sameInstance(value)); - } - - @Test - public void testWatermarkStateIsEmpty() throws Exception { - WatermarkHoldState value = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add(new Instant(1000)); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeEarliestWatermarkIntoSource() throws Exception { - WatermarkHoldState value1 = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - WatermarkHoldState value2 = - underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); - - value1.add(new Instant(3000)); - value2.add(new Instant(5000)); - value1.add(new Instant(4000)); - value2.add(new Instant(2000)); - - // Merging clears the old values and updates the merged value. - StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1); - - assertThat(value1.read(), equalTo(new Instant(2000))); - assertThat(value2.read(), equalTo(null)); - } - - @Test - public void testMergeLatestWatermarkIntoSource() throws Exception { - WatermarkHoldState value1 = - underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - WatermarkHoldState value2 = - underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); - WatermarkHoldState value3 = - underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); - - value1.add(new Instant(3000)); - value2.add(new Instant(5000)); - value1.add(new Instant(4000)); - value2.add(new Instant(2000)); - - // Merging clears the old values and updates the result value. - StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1); - - // Merging clears the old values and updates the result value. - assertThat(value3.read(), equalTo(new Instant(5000))); - assertThat(value1.read(), equalTo(null)); - assertThat(value2.read(), equalTo(null)); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/8362bdb9/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java new file mode 100644 index 0000000..bf3156a --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.CombiningState; +import org.apache.beam.sdk.state.GroupingState; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.ReadableState; +import org.apache.beam.sdk.state.SetState; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.state.WatermarkHoldState; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for {@link StateInternals}. + */ +public abstract class StateInternalsTest { + + private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10)); + static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); + private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); + private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); + + static final StateTag<ValueState<String>> STRING_VALUE_ADDR = + StateTags.value("stringValue", StringUtf8Coder.of()); + static final StateTag<CombiningState<Integer, int[], Integer>> + SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( + "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); + static final StateTag<BagState<String>> STRING_BAG_ADDR = + StateTags.bag("stringBag", StringUtf8Coder.of()); + static final StateTag<SetState<String>> STRING_SET_ADDR = + StateTags.set("stringSet", StringUtf8Coder.of()); + static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR = + StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of()); + static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); + private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); + + private StateInternals underTest; + + @Before + public void setUp() { + this.underTest = createStateInternals(); + } + + protected abstract StateInternals createStateInternals(); + + @Test + public void testValue() throws Exception { + ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR); + + // State instances are cached, but depend on the namespace. + assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), equalTo(value)); + assertThat( + underTest.state(NAMESPACE_2, STRING_VALUE_ADDR), + Matchers.not(equalTo(value))); + + assertThat(value.read(), Matchers.nullValue()); + value.write("hello"); + assertThat(value.read(), equalTo("hello")); + value.write("world"); + assertThat(value.read(), equalTo("world")); + + value.clear(); + assertThat(value.read(), Matchers.nullValue()); + assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), equalTo(value)); + } + + @Test + public void testBag() throws Exception { + BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + + // State instances are cached, but depend on the namespace. + assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_BAG_ADDR))); + assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)))); + + assertThat(value.read(), Matchers.emptyIterable()); + value.add("hello"); + assertThat(value.read(), containsInAnyOrder("hello")); + + value.add("world"); + assertThat(value.read(), containsInAnyOrder("hello", "world")); + + value.clear(); + assertThat(value.read(), Matchers.emptyIterable()); + assertThat(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), equalTo(value)); + } + + @Test + public void testBagIsEmpty() throws Exception { + BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add("hello"); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeBagIntoSource() throws Exception { + BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); + + bag1.add("Hello"); + bag2.add("World"); + bag1.add("!"); + + StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1); + + // Reading the merged bag gets both the contents + assertThat(bag1.read(), containsInAnyOrder("Hello", "World", "!")); + assertThat(bag2.read(), Matchers.emptyIterable()); + } + + @Test + public void testMergeBagIntoNewNamespace() throws Exception { + BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); + BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR); + + bag1.add("Hello"); + bag2.add("World"); + bag1.add("!"); + + StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3); + + // Reading the merged bag gets both the contents + assertThat(bag3.read(), containsInAnyOrder("Hello", "World", "!")); + assertThat(bag1.read(), Matchers.emptyIterable()); + assertThat(bag2.read(), Matchers.emptyIterable()); + } + + @Test + public void testSet() throws Exception { + + SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR); + + // State instances are cached, but depend on the namespace. + assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_SET_ADDR))); + assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_SET_ADDR)))); + + // empty + assertThat(value.read(), Matchers.emptyIterable()); + assertFalse(value.contains("A").read()); + + // add + value.add("A"); + value.add("B"); + value.add("A"); + assertFalse(value.addIfAbsent("B").read()); + assertThat(value.read(), containsInAnyOrder("A", "B")); + + // remove + value.remove("A"); + assertThat(value.read(), containsInAnyOrder("B")); + value.remove("C"); + assertThat(value.read(), containsInAnyOrder("B")); + + // contains + assertFalse(value.contains("A").read()); + assertTrue(value.contains("B").read()); + value.add("C"); + value.add("D"); + + // readLater + assertThat(value.readLater().read(), containsInAnyOrder("B", "C", "D")); + SetState<String> later = value.readLater(); + assertThat(later.read(), hasItems("C", "D")); + assertFalse(later.contains("A").read()); + + // clear + value.clear(); + assertThat(value.read(), Matchers.emptyIterable()); + assertThat(underTest.state(NAMESPACE_1, STRING_SET_ADDR), equalTo(value)); + + } + + @Test + public void testSetIsEmpty() throws Exception { + + SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add("hello"); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeSetIntoSource() throws Exception { + + SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR); + SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR); + + set1.add("Hello"); + set2.add("Hello"); + set2.add("World"); + set1.add("!"); + + StateMerging.mergeSets(Arrays.asList(set1, set2), set1); + + // Reading the merged set gets both the contents + assertThat(set1.read(), containsInAnyOrder("Hello", "World", "!")); + assertThat(set2.read(), Matchers.emptyIterable()); + } + + @Test + public void testMergeSetIntoNewNamespace() throws Exception { + + SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR); + SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR); + SetState<String> set3 = underTest.state(NAMESPACE_3, STRING_SET_ADDR); + + set1.add("Hello"); + set2.add("Hello"); + set2.add("World"); + set1.add("!"); + + StateMerging.mergeSets(Arrays.asList(set1, set2, set3), set3); + + // Reading the merged set gets both the contents + assertThat(set3.read(), containsInAnyOrder("Hello", "World", "!")); + assertThat(set1.read(), Matchers.emptyIterable()); + assertThat(set2.read(), Matchers.emptyIterable()); + } + + // for testMap + private static class MapEntry<K, V> implements Map.Entry<K, V> { + private K key; + private V value; + + private MapEntry(K key, V value) { + this.key = key; + this.value = value; + } + + static <K, V> Map.Entry<K, V> of(K k, V v) { + return new MapEntry<>(k, v); + } + + public final K getKey() { + return key; + } + public final V getValue() { + return value; + } + + public final String toString() { + return key + "=" + value; + } + + public final int hashCode() { + return Objects.hashCode(key) ^ Objects.hashCode(value); + } + + public final V setValue(V newValue) { + V oldValue = value; + value = newValue; + return oldValue; + } + + public final boolean equals(Object o) { + if (o == this) { + return true; + } + if (o instanceof Map.Entry) { + Map.Entry<?, ?> e = (Map.Entry<?, ?>) o; + if (Objects.equals(key, e.getKey()) + && Objects.equals(value, e.getValue())) { + return true; + } + } + return false; + } + } + + @Test + public void testMap() throws Exception { + + MapState<String, Integer> value = underTest.state(NAMESPACE_1, STRING_MAP_ADDR); + + // State instances are cached, but depend on the namespace. + assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_MAP_ADDR))); + assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_MAP_ADDR)))); + + // put + assertThat(value.entries().read(), Matchers.emptyIterable()); + value.put("A", 1); + value.put("B", 2); + value.put("A", 11); + assertThat(value.putIfAbsent("B", 22).read(), equalTo(2)); + assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("A", 11), + MapEntry.of("B", 2))); + + // remove + value.remove("A"); + assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2))); + value.remove("C"); + assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2))); + + // get + assertNull(value.get("A").read()); + assertThat(value.get("B").read(), equalTo(2)); + value.put("C", 3); + value.put("D", 4); + assertThat(value.get("C").read(), equalTo(3)); + + // iterate + value.put("E", 5); + value.remove("C"); + assertThat(value.keys().read(), containsInAnyOrder("B", "D", "E")); + assertThat(value.values().read(), containsInAnyOrder(2, 4, 5)); + assertThat( + value.entries().read(), + containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5))); + + // readLater + assertThat(value.get("B").readLater().read(), equalTo(2)); + assertNull(value.get("A").readLater().read()); + assertThat( + value.entries().readLater().read(), + containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5))); + + // clear + value.clear(); + assertThat(value.entries().read(), Matchers.emptyIterable()); + assertThat(underTest.state(NAMESPACE_1, STRING_MAP_ADDR), equalTo(value)); + } + + @Test + public void testCombiningValue() throws Exception { + + GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR))); + + assertThat(value.read(), equalTo(0)); + value.add(2); + assertThat(value.read(), equalTo(2)); + + value.add(3); + assertThat(value.read(), equalTo(5)); + + value.clear(); + assertThat(value.read(), equalTo(0)); + assertThat(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), equalTo(value)); + } + + @Test + public void testCombiningIsEmpty() throws Exception { + GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add(5); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeCombiningValueIntoSource() throws Exception { + CombiningState<Integer, int[], Integer> value1 = + underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + CombiningState<Integer, int[], Integer> value2 = + underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); + + value1.add(5); + value2.add(10); + value1.add(6); + + assertThat(value1.read(), equalTo(11)); + assertThat(value2.read(), equalTo(10)); + + // Merging clears the old values and updates the result value. + StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1); + + assertThat(value1.read(), equalTo(21)); + assertThat(value2.read(), equalTo(0)); + } + + @Test + public void testMergeCombiningValueIntoNewNamespace() throws Exception { + CombiningState<Integer, int[], Integer> value1 = + underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + CombiningState<Integer, int[], Integer> value2 = + underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); + CombiningState<Integer, int[], Integer> value3 = + underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); + + value1.add(5); + value2.add(10); + value1.add(6); + + StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3); + + // Merging clears the old values and updates the result value. + assertThat(value1.read(), equalTo(0)); + assertThat(value2.read(), equalTo(0)); + assertThat(value3.read(), equalTo(21)); + } + + @Test + public void testWatermarkEarliestState() throws Exception { + WatermarkHoldState value = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), equalTo(new Instant(2000))); + + value.add(new Instant(3000)); + assertThat(value.read(), equalTo(new Instant(2000))); + + value.add(new Instant(1000)); + assertThat(value.read(), equalTo(new Instant(1000))); + + value.clear(); + assertThat(value.read(), equalTo(null)); + assertThat(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), equalTo(value)); + } + + @Test + public void testWatermarkLatestState() throws Exception { + WatermarkHoldState value = + underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), equalTo(new Instant(2000))); + + value.add(new Instant(3000)); + assertThat(value.read(), equalTo(new Instant(3000))); + + value.add(new Instant(1000)); + assertThat(value.read(), equalTo(new Instant(3000))); + + value.clear(); + assertThat(value.read(), equalTo(null)); + assertThat(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), equalTo(value)); + } + + @Test + public void testWatermarkEndOfWindowState() throws Exception { + WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), equalTo(new Instant(2000))); + + value.clear(); + assertThat(value.read(), equalTo(null)); + assertThat(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), equalTo(value)); + } + + @Test + public void testWatermarkStateIsEmpty() throws Exception { + WatermarkHoldState value = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add(new Instant(1000)); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeEarliestWatermarkIntoSource() throws Exception { + WatermarkHoldState value1 = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + WatermarkHoldState value2 = + underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); + + value1.add(new Instant(3000)); + value2.add(new Instant(5000)); + value1.add(new Instant(4000)); + value2.add(new Instant(2000)); + + // Merging clears the old values and updates the merged value. + StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1); + + assertThat(value1.read(), equalTo(new Instant(2000))); + assertThat(value2.read(), equalTo(null)); + } + + @Test + public void testMergeLatestWatermarkIntoSource() throws Exception { + WatermarkHoldState value1 = + underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); + WatermarkHoldState value2 = + underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); + WatermarkHoldState value3 = + underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); + + value1.add(new Instant(3000)); + value2.add(new Instant(5000)); + value1.add(new Instant(4000)); + value2.add(new Instant(2000)); + + // Merging clears the old values and updates the result value. + StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1); + + // Merging clears the old values and updates the result value. + assertThat(value3.read(), equalTo(new Instant(5000))); + assertThat(value1.read(), equalTo(null)); + assertThat(value2.read(), equalTo(null)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8362bdb9/runners/flink/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index c4c6b55..a5b8203 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -381,5 +381,13 @@ <type>test-jar</type> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-core-java</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/8362bdb9/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java index 35d2b78..e7564ec 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java @@ -17,31 +17,11 @@ */ package org.apache.beam.runners.flink.streaming; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertThat; - import java.nio.ByteBuffer; -import java.util.Arrays; -import org.apache.beam.runners.core.StateMerging; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateNamespaceForTest; -import org.apache.beam.runners.core.StateTag; -import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateInternalsTest; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.CombiningState; -import org.apache.beam.sdk.state.GroupingState; -import org.apache.beam.sdk.state.ReadableState; -import org.apache.beam.sdk.state.ValueState; -import org.apache.beam.sdk.state.WatermarkHoldState; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CoderUtils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; @@ -52,42 +32,17 @@ import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.hamcrest.Matchers; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** - * Tests for {@link FlinkStateInternals}. This is based on the tests for - * {@code InMemoryStateInternals}. + * Tests for {@link FlinkStateInternals}. This is based on {@link StateInternalsTest}. */ @RunWith(JUnit4.class) -public class FlinkStateInternalsTest { - private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10)); - private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); - private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); - private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - - private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = - StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<CombiningState<Integer, int[], Integer>> - SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( - "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); - private static final StateTag<BagState<String>> STRING_BAG_ADDR = - StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); - private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); - private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); - - FlinkStateInternals<String> underTest; +public class FlinkStateInternalsTest extends StateInternalsTest { - @Before - public void initStateInternals() { + @Override + protected StateInternals createStateInternals() { MemoryStateBackend backend = new MemoryStateBackend(); try { AbstractKeyedStateBackend<ByteBuffer> keyedStateBackend = backend.createKeyedStateBackend( @@ -98,296 +53,31 @@ public class FlinkStateInternalsTest { 1, new KeyGroupRange(0, 0), new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); - underTest = new FlinkStateInternals<>(keyedStateBackend, StringUtf8Coder.of()); keyedStateBackend.setCurrentKey( ByteBuffer.wrap(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Hello"))); + + return new FlinkStateInternals<>(keyedStateBackend, StringUtf8Coder.of()); } catch (Exception e) { throw new RuntimeException(e); } } - @Test - public void testValue() throws Exception { - ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR); - - assertEquals(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), value); - assertNotEquals( - underTest.state(NAMESPACE_2, STRING_VALUE_ADDR), - value); - - assertThat(value.read(), Matchers.nullValue()); - value.write("hello"); - assertThat(value.read(), Matchers.equalTo("hello")); - value.write("world"); - assertThat(value.read(), Matchers.equalTo("world")); - - value.clear(); - assertThat(value.read(), Matchers.nullValue()); - assertEquals(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), value); - - } - - @Test - public void testBag() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - - assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))); - - assertThat(value.read(), Matchers.emptyIterable()); - value.add("hello"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello")); - - value.add("world"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world")); - - value.clear(); - assertThat(value.read(), Matchers.emptyIterable()); - assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value); - - } - - @Test - public void testBagIsEmpty() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add("hello"); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeBagIntoSource() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); - - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); - - StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1); - - // Reading the merged bag gets both the contents - assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); - assertThat(bag2.read(), Matchers.emptyIterable()); - } - - @Test - public void testMergeBagIntoNewNamespace() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); - BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR); - - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); - - StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3); - - // Reading the merged bag gets both the contents - assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); - assertThat(bag1.read(), Matchers.emptyIterable()); - assertThat(bag2.read(), Matchers.emptyIterable()); - } - - @Test - public void testCombiningValue() throws Exception { - GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR))); - - assertThat(value.read(), Matchers.equalTo(0)); - value.add(2); - assertThat(value.read(), Matchers.equalTo(2)); - - value.add(3); - assertThat(value.read(), Matchers.equalTo(5)); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(0)); - assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value); - } - - @Test - public void testCombiningIsEmpty() throws Exception { - GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add(5); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeCombiningValueIntoSource() throws Exception { - CombiningState<Integer, int[], Integer> value1 = - underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value2 = - underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - - value1.add(5); - value2.add(10); - value1.add(6); - - assertThat(value1.read(), Matchers.equalTo(11)); - assertThat(value2.read(), Matchers.equalTo(10)); - - // Merging clears the old values and updates the result value. - StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1); - - assertThat(value1.read(), Matchers.equalTo(21)); - assertThat(value2.read(), Matchers.equalTo(0)); - } - - @Test - public void testMergeCombiningValueIntoNewNamespace() throws Exception { - CombiningState<Integer, int[], Integer> value1 = - underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value2 = - underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value3 = - underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); - - value1.add(5); - value2.add(10); - value1.add(6); - - StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3); - - // Merging clears the old values and updates the result value. - assertThat(value1.read(), Matchers.equalTo(0)); - assertThat(value2.read(), Matchers.equalTo(0)); - assertThat(value3.read(), Matchers.equalTo(21)); - } - - @Test - public void testWatermarkEarliestState() throws Exception { - WatermarkHoldState value = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + ///////////////////////// Unsupported tests \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\ - value.add(new Instant(3000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + @Override + public void testSet() {} - value.add(new Instant(1000)); - assertThat(value.read(), Matchers.equalTo(new Instant(1000))); + @Override + public void testSetIsEmpty() {} - value.clear(); - assertThat(value.read(), Matchers.equalTo(null)); - assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value); - } - - @Test - public void testWatermarkLatestState() throws Exception { - WatermarkHoldState value = - underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR))); + @Override + public void testMergeSetIntoSource() {} - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + @Override + public void testMergeSetIntoNewNamespace() {} - value.add(new Instant(3000)); - assertThat(value.read(), Matchers.equalTo(new Instant(3000))); + @Override + public void testMap() {} - value.add(new Instant(1000)); - assertThat(value.read(), Matchers.equalTo(new Instant(3000))); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(null)); - assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value); - } - - @Test - public void testWatermarkEndOfWindowState() throws Exception { - WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(null)); - assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value); - } - - @Test - public void testWatermarkStateIsEmpty() throws Exception { - WatermarkHoldState value = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add(new Instant(1000)); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeEarliestWatermarkIntoSource() throws Exception { - WatermarkHoldState value1 = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - WatermarkHoldState value2 = - underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); - - value1.add(new Instant(3000)); - value2.add(new Instant(5000)); - value1.add(new Instant(4000)); - value2.add(new Instant(2000)); - - // Merging clears the old values and updates the merged value. - StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1); - - assertThat(value1.read(), Matchers.equalTo(new Instant(2000))); - assertThat(value2.read(), Matchers.equalTo(null)); - } - - @Test - public void testMergeLatestWatermarkIntoSource() throws Exception { - WatermarkHoldState value1 = - underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - WatermarkHoldState value2 = - underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); - WatermarkHoldState value3 = - underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); - - value1.add(new Instant(3000)); - value2.add(new Instant(5000)); - value1.add(new Instant(4000)); - value2.add(new Instant(2000)); - - // Merging clears the old values and updates the result value. - StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1); - - // Merging clears the old values and updates the result value. - assertThat(value3.read(), Matchers.equalTo(new Instant(5000))); - assertThat(value1.read(), Matchers.equalTo(null)); - assertThat(value2.read(), Matchers.equalTo(null)); - } }