Repository: beam Updated Branches: refs/heads/master fc55d2f81 -> 202aae9d3
BEAM-2022 fix triggering for processing time timers Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eb860388 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eb860388 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eb860388 Branch: refs/heads/master Commit: eb860388a8626837655e82171c8480421384e419 Parents: 2b6cb8c Author: Thomas Weise <t...@apache.org> Authored: Sat Apr 29 01:17:22 2017 -0700 Committer: Thomas Weise <t...@apache.org> Committed: Sat Apr 29 01:17:22 2017 -0700 ---------------------------------------------------------------------- .../operators/ApexGroupByKeyOperator.java | 41 ++- .../operators/ApexTimerInternals.java | 155 +++++--- .../translation/ApexStateInternalsTest.java | 368 ------------------- .../operators/ApexTimerInternalsTest.java | 78 +++- .../utils/ApexStateInternalsTest.java | 367 ++++++++++++++++++ 5 files changed, 567 insertions(+), 442 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/eb860388/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index f8b6653..3c9f5ab 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -25,11 +25,9 @@ import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Operator; import com.datatorrent.api.StreamCodec; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.netlet.util.Slice; import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.base.Throwables; -import com.google.common.collect.Multimap; import java.util.Collection; import java.util.Collections; import org.apache.beam.runners.apex.ApexPipelineOptions; @@ -41,6 +39,7 @@ import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.core.construction.Triggers; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; @@ -49,8 +48,8 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.NullSideInputReader; +import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -68,7 +67,8 @@ import org.slf4j.LoggerFactory; * @param <K> key type * @param <V> value type */ -public class ApexGroupByKeyOperator<K, V> implements Operator { +public class ApexGroupByKeyOperator<K, V> implements Operator, + ApexTimerInternals.TimerProcessor<K> { private static final Logger LOG = LoggerFactory.getLogger(ApexGroupByKeyOperator.class); private boolean traceTuples = true; @@ -106,7 +106,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { } processElement(t.getValue()); } catch (Exception e) { - Throwables.propagateIfPossible(e); + Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } } @@ -143,6 +143,8 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { @Override public void endWindow() { + timerInternals.fireReadyTimers(timerInternals.currentProcessingTime().getMillis(), + this, TimeDomain.PROCESSING_TIME); } @Override @@ -195,7 +197,6 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { serializedOptions.get()); } - private void processElement(WindowedValue<KV<K, V>> windowedValue) throws Exception { final KV<K, V> kv = windowedValue.getValue(); final WindowedValue<V> updatedWindowedValue = WindowedValue.of(kv.getValue(), @@ -209,19 +210,23 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { reduceFnRunner.persist(); } - private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception { - this.inputWatermark = new Instant(mark.getTimestamp()); - Multimap<Slice, TimerInternals.TimerData> timers = timerInternals.getTimersReadyToProcess( - mark.getTimestamp()); - if (!timers.isEmpty()) { - for (Slice keyBytes : timers.keySet()) { - K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.buffer); - timerInternals.setContext(key, keyCoder, inputWatermark); - ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = newReduceFnRunner(key); - reduceFnRunner.onTimers(timers.get(keyBytes)); - reduceFnRunner.persist(); - } + @Override + public void fireTimer(K key, Collection<TimerData> timerData) { + timerInternals.setContext(key, keyCoder, inputWatermark); + ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = newReduceFnRunner(key); + try { + reduceFnRunner.onTimers(timerData); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } + reduceFnRunner.persist(); + } + + private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) { + this.inputWatermark = new Instant(mark.getTimestamp()); + timerInternals.fireReadyTimers(this.inputWatermark.getMillis(), + this, TimeDomain.EVENT_TIME); } } http://git-wip-us.apache.org/repos/asf/beam/blob/eb860388/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java index b142095..15ccbee 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java @@ -20,9 +20,12 @@ package org.apache.beam.runners.apex.translation.operators; import com.datatorrent.netlet.util.Slice; import com.esotericsoftware.kryo.DefaultSerializer; import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ComparisonChain; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import java.io.Serializable; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -47,14 +50,16 @@ import org.joda.time.Instant; @DefaultSerializer(JavaSerializer.class) class ApexTimerInternals<K> implements TimerInternals, Serializable { - private Map<Slice, Set<Slice>> activeTimers = new HashMap<>(); - private TimerDataCoder timerDataCoder; + private final TimerSet eventTimeTimeTimers; + private final TimerSet processingTimeTimers; + private transient K currentKey; private transient Instant currentInputWatermark; private transient Coder<K> keyCoder; public ApexTimerInternals(TimerDataCoder timerDataCoder) { - this.timerDataCoder = timerDataCoder; + this.eventTimeTimeTimers = new TimerSet(timerDataCoder); + this.processingTimeTimers = new TimerSet(timerDataCoder); } public void setContext(K key, Coder<K> keyCoder, Instant inputWatermark) { @@ -63,31 +68,37 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable { this.currentInputWatermark = inputWatermark; } + @VisibleForTesting + protected TimerSet getTimerSet(TimeDomain domain) { + return (domain == TimeDomain.EVENT_TIME) ? eventTimeTimeTimers : processingTimeTimers; + } + @Override public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { TimerData timerData = TimerData.of(timerId, namespace, target, timeDomain); - registerActiveTimer(currentKey, timerData); + setTimer(timerData); } @Override public void setTimer(TimerData timerData) { - registerActiveTimer(currentKey, timerData); + getTimerSet(timerData.getDomain()).addTimer(getKeyBytes(this.currentKey), timerData); } @Override public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { - throw new UnsupportedOperationException(); + getTimerSet(timeDomain).deleteTimer(getKeyBytes(this.currentKey), namespace, timerId); } @Override public void deleteTimer(StateNamespace namespace, String timerId) { - throw new UnsupportedOperationException(); + this.eventTimeTimeTimers.deleteTimer(getKeyBytes(this.currentKey), namespace, timerId); + this.processingTimeTimers.deleteTimer(getKeyBytes(this.currentKey), namespace, timerId); } @Override public void deleteTimer(TimerData timerKey) { - unregisterActiveTimer(currentKey, timerKey); + getTimerSet(timerKey.getDomain()).deleteTimer(getKeyBytes(this.currentKey), timerKey); } @Override @@ -102,7 +113,7 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable { @Override public Instant currentInputWatermarkTime() { - return new Instant(currentInputWatermark); + return currentInputWatermark; } @Override @@ -110,14 +121,17 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable { return null; } + public interface TimerProcessor<K> { + void fireTimer(K key, Collection<TimerData> timerData); + } + /** - * Returns the list of timers that are ready to fire. These are the timers - * that are registered to be triggered at a time before the current watermark. - * We keep these timers in a Set, so that they are deduplicated, as the same - * timer can be registered multiple times. + * Fire the timers that are ready. These are the timers + * that are registered to be triggered at a time before the current time. */ - public Multimap<Slice, TimerInternals.TimerData> getTimersReadyToProcess( - long currentWatermark) { + public void fireReadyTimers(long currentTime, + TimerProcessor<K> timerProcessor, TimeDomain timeDomain) { + TimerSet timers = getTimerSet(timeDomain); // we keep the timers to return in a different list and launch them later // because we cannot prevent a trigger from registering another timer, @@ -125,16 +139,16 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable { Multimap<Slice, TimerInternals.TimerData> toFire = HashMultimap.create(); Iterator<Map.Entry<Slice, Set<Slice>>> it = - activeTimers.entrySet().iterator(); + timers.activeTimers.entrySet().iterator(); while (it.hasNext()) { Map.Entry<Slice, Set<Slice>> keyWithTimers = it.next(); Iterator<Slice> timerIt = keyWithTimers.getValue().iterator(); while (timerIt.hasNext()) { try { - TimerData timerData = CoderUtils.decodeFromByteArray(timerDataCoder, + TimerData timerData = CoderUtils.decodeFromByteArray(timers.timerDataCoder, timerIt.next().buffer); - if (timerData.getTimestamp().isBefore(currentWatermark)) { + if (timerData.getTimestamp().isBefore(currentTime)) { toFire.put(keyWithTimers.getKey(), timerData); timerIt.remove(); } @@ -147,55 +161,106 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable { it.remove(); } } - return toFire; - } - private void registerActiveTimer(K key, TimerData timer) { - final Slice keyBytes; - try { - keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key)); - } catch (CoderException e) { - throw new RuntimeException(e); - } - Set<Slice> timersForKey = activeTimers.get(keyBytes); - if (timersForKey == null) { - timersForKey = new HashSet<>(); + // fire ready timers + if (!toFire.isEmpty()) { + for (Slice keyBytes : toFire.keySet()) { + try { + K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.buffer); + timerProcessor.fireTimer(key, toFire.get(keyBytes)); + } catch (CoderException e) { + throw new RuntimeException(e); + } + } } + } + private Slice getKeyBytes(K key) { try { - Slice timerBytes = new Slice(CoderUtils.encodeToByteArray(timerDataCoder, timer)); - timersForKey.add(timerBytes); + return new Slice(CoderUtils.encodeToByteArray(keyCoder, key)); } catch (CoderException e) { throw new RuntimeException(e); } - - activeTimers.put(keyBytes, timersForKey); } - private void unregisterActiveTimer(K key, TimerData timer) { - final Slice keyBytes; - try { - keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key)); - } catch (CoderException e) { - throw new RuntimeException(e); + protected static class TimerSet implements Serializable { + private final Map<Slice, Set<Slice>> activeTimers = new HashMap<>(); + private final TimerDataCoder timerDataCoder; + + protected TimerSet(TimerDataCoder timerDataCoder) { + this.timerDataCoder = timerDataCoder; } - Set<Slice> timersForKey = activeTimers.get(keyBytes); - if (timersForKey != null) { + public void addTimer(Slice keyBytes, TimerData timer) { + Set<Slice> timersForKey = activeTimers.get(keyBytes); + if (timersForKey == null) { + timersForKey = new HashSet<>(); + } + try { Slice timerBytes = new Slice(CoderUtils.encodeToByteArray(timerDataCoder, timer)); timersForKey.add(timerBytes); - timersForKey.remove(timerBytes); } catch (CoderException e) { throw new RuntimeException(e); } + activeTimers.put(keyBytes, timersForKey); + } + + public void deleteTimer(Slice keyBytes, StateNamespace namespace, String timerId) { + Set<Slice> timersForKey = activeTimers.get(keyBytes); + if (timersForKey == null) { + return; + } + + Iterator<Slice> timerIt = timersForKey.iterator(); + while (timerIt.hasNext()) { + try { + TimerData timerData = CoderUtils.decodeFromByteArray(timerDataCoder, + timerIt.next().buffer); + ComparisonChain chain = + ComparisonChain.start().compare(timerData.getTimerId(), timerId); + if (chain.result() == 0 && !timerData.getNamespace().equals(namespace)) { + // Obtaining the stringKey may be expensive; only do so if required + chain = chain.compare(timerData.getNamespace().stringKey(), namespace.stringKey()); + } + if (chain.result() == 0) { + timerIt.remove(); + } + } catch (CoderException e) { + throw new RuntimeException(e); + } + } + if (timersForKey.isEmpty()) { activeTimers.remove(keyBytes); - } else { - activeTimers.put(keyBytes, timersForKey); } } + + public void deleteTimer(Slice keyBytes, TimerData timerKey) { + Set<Slice> timersForKey = activeTimers.get(keyBytes); + if (timersForKey != null) { + try { + Slice timerBytes = new Slice(CoderUtils.encodeToByteArray(timerDataCoder, timerKey)); + timersForKey.add(timerBytes); + timersForKey.remove(timerBytes); + } catch (CoderException e) { + throw new RuntimeException(e); + } + + if (timersForKey.isEmpty()) { + activeTimers.remove(keyBytes); + } else { + activeTimers.put(keyBytes, timersForKey); + } + } + } + + @VisibleForTesting + protected Map<Slice, Set<Slice>> getMap() { + return activeTimers; + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/eb860388/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java deleted file mode 100644 index 091fe3b..0000000 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java +++ /dev/null @@ -1,368 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translation; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; - -import com.datatorrent.lib.util.KryoCloneUtils; -import java.util.Arrays; -import org.apache.beam.runners.apex.translation.utils.ApexStateInternals; -import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend; -import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory; -import org.apache.beam.runners.core.StateMerging; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateNamespaceForTest; -import org.apache.beam.runners.core.StateTag; -import org.apache.beam.runners.core.StateTags; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CombiningState; -import org.apache.beam.sdk.util.state.GroupingState; -import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.ValueState; -import org.apache.beam.sdk.util.state.WatermarkHoldState; -import org.hamcrest.Matchers; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; - -/** - * Tests for {@link ApexStateInternals}. This is based on the tests for - * {@code InMemoryStateInternals}. - */ -public class ApexStateInternalsTest { - private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10)); - private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); - private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); - private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - - private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = - StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<Object, CombiningState<Integer, int[], Integer>> - SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( - "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); - private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = - StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<Object, WatermarkHoldState> - WATERMARK_EARLIEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); - - private ApexStateInternals<String> underTest; - - @Before - public void initStateInternals() { - underTest = new ApexStateInternals.ApexStateBackend() - .newStateInternalsFactory(StringUtf8Coder.of()) - .stateInternalsForKey((String) null); - } - - @Test - public void testBag() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - - assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))); - - assertThat(value.read(), Matchers.emptyIterable()); - value.add("hello"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello")); - - value.add("world"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world")); - - value.clear(); - assertThat(value.read(), Matchers.emptyIterable()); - assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value); - - } - - @Test - public void testBagIsEmpty() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add("hello"); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeBagIntoSource() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); - - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); - - StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1); - - // Reading the merged bag gets both the contents - assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); - assertThat(bag2.read(), Matchers.emptyIterable()); - } - - @Test - public void testMergeBagIntoNewNamespace() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); - BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR); - - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); - - StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3); - - // Reading the merged bag gets both the contents - assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); - assertThat(bag1.read(), Matchers.emptyIterable()); - assertThat(bag2.read(), Matchers.emptyIterable()); - } - - @Test - public void testCombiningValue() throws Exception { - GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR))); - - assertThat(value.read(), Matchers.equalTo(0)); - value.add(2); - assertThat(value.read(), Matchers.equalTo(2)); - - value.add(3); - assertThat(value.read(), Matchers.equalTo(5)); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(0)); - assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value); - } - - @Test - public void testCombiningIsEmpty() throws Exception { - GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add(5); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeCombiningValueIntoSource() throws Exception { - CombiningState<Integer, int[], Integer> value1 = - underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value2 = - underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - - value1.add(5); - value2.add(10); - value1.add(6); - - assertThat(value1.read(), Matchers.equalTo(11)); - assertThat(value2.read(), Matchers.equalTo(10)); - - // Merging clears the old values and updates the result value. - StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1); - - assertThat(value1.read(), Matchers.equalTo(21)); - assertThat(value2.read(), Matchers.equalTo(0)); - } - - @Test - public void testMergeCombiningValueIntoNewNamespace() throws Exception { - CombiningState<Integer, int[], Integer> value1 = - underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value2 = - underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - CombiningState<Integer, int[], Integer> value3 = - underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); - - value1.add(5); - value2.add(10); - value1.add(6); - - StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3); - - // Merging clears the old values and updates the result value. - assertThat(value1.read(), Matchers.equalTo(0)); - assertThat(value2.read(), Matchers.equalTo(0)); - assertThat(value3.read(), Matchers.equalTo(21)); - } - - @Test - public void testWatermarkEarliestState() throws Exception { - WatermarkHoldState value = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.add(new Instant(3000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.add(new Instant(1000)); - assertThat(value.read(), Matchers.equalTo(new Instant(1000))); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(null)); - assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value); - } - - @Test - public void testWatermarkLatestState() throws Exception { - WatermarkHoldState value = - underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.add(new Instant(3000)); - assertThat(value.read(), Matchers.equalTo(new Instant(3000))); - - value.add(new Instant(1000)); - assertThat(value.read(), Matchers.equalTo(new Instant(3000))); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(null)); - assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value); - } - - @Test - public void testWatermarkEndOfWindowState() throws Exception { - WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(null)); - assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value); - } - - @Test - public void testWatermarkStateIsEmpty() throws Exception { - WatermarkHoldState value = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add(new Instant(1000)); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeEarliestWatermarkIntoSource() throws Exception { - WatermarkHoldState value1 = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - WatermarkHoldState value2 = - underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); - - value1.add(new Instant(3000)); - value2.add(new Instant(5000)); - value1.add(new Instant(4000)); - value2.add(new Instant(2000)); - - // Merging clears the old values and updates the merged value. - StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1); - - assertThat(value1.read(), Matchers.equalTo(new Instant(2000))); - assertThat(value2.read(), Matchers.equalTo(null)); - } - - @Test - public void testMergeLatestWatermarkIntoSource() throws Exception { - WatermarkHoldState value1 = - underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - WatermarkHoldState value2 = - underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); - WatermarkHoldState value3 = - underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); - - value1.add(new Instant(3000)); - value2.add(new Instant(5000)); - value1.add(new Instant(4000)); - value2.add(new Instant(2000)); - - // Merging clears the old values and updates the result value. - StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1); - - // Merging clears the old values and updates the result value. - assertThat(value3.read(), Matchers.equalTo(new Instant(5000))); - assertThat(value1.read(), Matchers.equalTo(null)); - assertThat(value2.read(), Matchers.equalTo(null)); - } - - @Test - public void testSerialization() throws Exception { - ApexStateInternalsFactory<String> sif = new ApexStateBackend(). - newStateInternalsFactory(StringUtf8Coder.of()); - ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy"); - - ValueState<String> value = keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR); - assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value); - value.write("hello"); - - ApexStateInternalsFactory<String> cloned; - assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif)); - ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy"); - - ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR); - assertThat(clonedValue.read(), Matchers.equalTo("hello")); - assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/eb860388/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java index 1d7e586..ee142e2 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java @@ -22,8 +22,12 @@ import static org.junit.Assert.assertNotNull; import com.datatorrent.lib.util.KryoCloneUtils; import com.datatorrent.netlet.util.Slice; -import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.beam.runners.apex.translation.operators.ApexTimerInternals.TimerProcessor; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.core.TimerInternals.TimerDataCoder; @@ -40,6 +44,15 @@ public class ApexTimerInternalsTest { @Test public void testEventTimeTimers() { + + final Map<String, Collection<TimerData>> firedTimers = new HashMap<>(); + TimerProcessor<String> timerProcessor = new TimerProcessor<String>() { + @Override + public void fireTimer(String key, Collection<TimerData> timerData) { + firedTimers.put(key, timerData); + } + }; + TimerDataCoder timerDataCoder = TimerDataCoder.of(GlobalWindow.Coder.INSTANCE); String key1 = "key1"; Instant instant0 = new Instant(0); @@ -57,17 +70,60 @@ public class ApexTimerInternalsTest { instant1, TimeDomain.EVENT_TIME); timerInternals.setTimer(timerData1); - Multimap<Slice, TimerData> timers = timerInternals.getTimersReadyToProcess( - instant0.getMillis()); - assertEquals(0, timers.size()); + timerInternals.fireReadyTimers(instant0.getMillis(), timerProcessor, TimeDomain.EVENT_TIME); + assertEquals(0, firedTimers.size()); + firedTimers.clear(); - timers = timerInternals.getTimersReadyToProcess(instant1.getMillis()); - assertEquals(1, timers.size()); - assertEquals(Sets.newHashSet(timerData0), Sets.newHashSet(timers.values())); + timerInternals.fireReadyTimers(instant1.getMillis(), timerProcessor, + TimeDomain.PROCESSING_TIME); + assertEquals(0, firedTimers.size()); + timerInternals.fireReadyTimers(instant1.getMillis(), timerProcessor, TimeDomain.EVENT_TIME); + assertEquals(1, firedTimers.size()); + assertEquals(Sets.newHashSet(timerData0), + Sets.newHashSet(firedTimers.values().iterator().next())); + firedTimers.clear(); - timers = timerInternals.getTimersReadyToProcess(instant2.getMillis()); - assertEquals(1, timers.size()); - assertEquals(Sets.newHashSet(timerData1), Sets.newHashSet(timers.values())); + timerInternals.fireReadyTimers(instant2.getMillis(), timerProcessor, TimeDomain.EVENT_TIME); + assertEquals(1, firedTimers.size()); + assertEquals(Sets.newHashSet(timerData1), + Sets.newHashSet(firedTimers.values().iterator().next())); + firedTimers.clear(); + } + + @Test + public void testDeleteTimer() { + TimerDataCoder timerDataCoder = TimerDataCoder.of(GlobalWindow.Coder.INSTANCE); + String key1 = "key1"; + Instant instant0 = new Instant(0); + Instant instant1 = new Instant(1); + + ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder); + timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now()); + + TimerData timerData0 = TimerData.of("timerData0", StateNamespaces.global(), + instant0, TimeDomain.EVENT_TIME); + timerInternals.setTimer(timerData0); + + TimerData timerData1 = TimerData.of("timerData1", StateNamespaces.global(), + instant1, TimeDomain.EVENT_TIME); + timerInternals.setTimer(timerData1); + + Map<?, Set<Slice>> timerMap = timerInternals.getTimerSet(TimeDomain.EVENT_TIME).getMap(); + assertEquals(1, timerMap.size()); + assertEquals(2, timerMap.values().iterator().next().size()); + + timerInternals.deleteTimer(timerData0.getNamespace(), timerData0.getTimerId()); + assertEquals(1, timerMap.size()); + assertEquals(1, timerMap.values().iterator().next().size()); + + timerInternals.deleteTimer(timerData1.getNamespace(), timerData1.getTimerId(), + TimeDomain.PROCESSING_TIME); + assertEquals(1, timerMap.size()); + assertEquals(1, timerMap.values().iterator().next().size()); + + timerInternals.deleteTimer(timerData1.getNamespace(), timerData1.getTimerId(), + TimeDomain.EVENT_TIME); + assertEquals(0, timerMap.size()); } @Test @@ -82,7 +138,7 @@ public class ApexTimerInternalsTest { ApexTimerInternals<String> cloned; assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(timerInternals)); cloned.setContext(key, StringUtf8Coder.of(), Instant.now()); - Multimap<Slice, TimerData> timers = cloned.getTimersReadyToProcess(new Instant(1).getMillis()); + Map<?, Set<Slice>> timers = cloned.getTimerSet(TimeDomain.EVENT_TIME).getMap(); assertEquals(1, timers.size()); } http://git-wip-us.apache.org/repos/asf/beam/blob/eb860388/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java new file mode 100644 index 0000000..225b654 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translation.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; + +import com.datatorrent.lib.util.KryoCloneUtils; +import java.util.Arrays; +import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend; +import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory; +import org.apache.beam.runners.core.StateMerging; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaceForTest; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.GroupingState; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for {@link ApexStateInternals}. This is based on the tests for + * {@code InMemoryStateInternals}. + */ +public class ApexStateInternalsTest { + private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10)); + private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); + private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); + private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); + + private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = + StateTags.value("stringValue", StringUtf8Coder.of()); + private static final StateTag<Object, CombiningState<Integer, int[], Integer>> + SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( + "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); + private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = + StateTags.bag("stringBag", StringUtf8Coder.of()); + private static final StateTag<Object, WatermarkHoldState> + WATERMARK_EARLIEST_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); + private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); + + private ApexStateInternals<String> underTest; + + @Before + public void initStateInternals() { + underTest = new ApexStateInternals.ApexStateBackend() + .newStateInternalsFactory(StringUtf8Coder.of()) + .stateInternalsForKey((String) null); + } + + @Test + public void testBag() throws Exception { + BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + + assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))); + + assertThat(value.read(), Matchers.emptyIterable()); + value.add("hello"); + assertThat(value.read(), Matchers.containsInAnyOrder("hello")); + + value.add("world"); + assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world")); + + value.clear(); + assertThat(value.read(), Matchers.emptyIterable()); + assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value); + + } + + @Test + public void testBagIsEmpty() throws Exception { + BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add("hello"); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeBagIntoSource() throws Exception { + BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); + + bag1.add("Hello"); + bag2.add("World"); + bag1.add("!"); + + StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1); + + // Reading the merged bag gets both the contents + assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); + assertThat(bag2.read(), Matchers.emptyIterable()); + } + + @Test + public void testMergeBagIntoNewNamespace() throws Exception { + BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); + BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR); + + bag1.add("Hello"); + bag2.add("World"); + bag1.add("!"); + + StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3); + + // Reading the merged bag gets both the contents + assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); + assertThat(bag1.read(), Matchers.emptyIterable()); + assertThat(bag2.read(), Matchers.emptyIterable()); + } + + @Test + public void testCombiningValue() throws Exception { + GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR))); + + assertThat(value.read(), Matchers.equalTo(0)); + value.add(2); + assertThat(value.read(), Matchers.equalTo(2)); + + value.add(3); + assertThat(value.read(), Matchers.equalTo(5)); + + value.clear(); + assertThat(value.read(), Matchers.equalTo(0)); + assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value); + } + + @Test + public void testCombiningIsEmpty() throws Exception { + GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add(5); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeCombiningValueIntoSource() throws Exception { + CombiningState<Integer, int[], Integer> value1 = + underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + CombiningState<Integer, int[], Integer> value2 = + underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); + + value1.add(5); + value2.add(10); + value1.add(6); + + assertThat(value1.read(), Matchers.equalTo(11)); + assertThat(value2.read(), Matchers.equalTo(10)); + + // Merging clears the old values and updates the result value. + StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1); + + assertThat(value1.read(), Matchers.equalTo(21)); + assertThat(value2.read(), Matchers.equalTo(0)); + } + + @Test + public void testMergeCombiningValueIntoNewNamespace() throws Exception { + CombiningState<Integer, int[], Integer> value1 = + underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + CombiningState<Integer, int[], Integer> value2 = + underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); + CombiningState<Integer, int[], Integer> value3 = + underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); + + value1.add(5); + value2.add(10); + value1.add(6); + + StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3); + + // Merging clears the old values and updates the result value. + assertThat(value1.read(), Matchers.equalTo(0)); + assertThat(value2.read(), Matchers.equalTo(0)); + assertThat(value3.read(), Matchers.equalTo(21)); + } + + @Test + public void testWatermarkEarliestState() throws Exception { + WatermarkHoldState value = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + + value.add(new Instant(3000)); + assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + + value.add(new Instant(1000)); + assertThat(value.read(), Matchers.equalTo(new Instant(1000))); + + value.clear(); + assertThat(value.read(), Matchers.equalTo(null)); + assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value); + } + + @Test + public void testWatermarkLatestState() throws Exception { + WatermarkHoldState value = + underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + + value.add(new Instant(3000)); + assertThat(value.read(), Matchers.equalTo(new Instant(3000))); + + value.add(new Instant(1000)); + assertThat(value.read(), Matchers.equalTo(new Instant(3000))); + + value.clear(); + assertThat(value.read(), Matchers.equalTo(null)); + assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value); + } + + @Test + public void testWatermarkEndOfWindowState() throws Exception { + WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + + value.clear(); + assertThat(value.read(), Matchers.equalTo(null)); + assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value); + } + + @Test + public void testWatermarkStateIsEmpty() throws Exception { + WatermarkHoldState value = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add(new Instant(1000)); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeEarliestWatermarkIntoSource() throws Exception { + WatermarkHoldState value1 = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + WatermarkHoldState value2 = + underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); + + value1.add(new Instant(3000)); + value2.add(new Instant(5000)); + value1.add(new Instant(4000)); + value2.add(new Instant(2000)); + + // Merging clears the old values and updates the merged value. + StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1); + + assertThat(value1.read(), Matchers.equalTo(new Instant(2000))); + assertThat(value2.read(), Matchers.equalTo(null)); + } + + @Test + public void testMergeLatestWatermarkIntoSource() throws Exception { + WatermarkHoldState value1 = + underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); + WatermarkHoldState value2 = + underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); + WatermarkHoldState value3 = + underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); + + value1.add(new Instant(3000)); + value2.add(new Instant(5000)); + value1.add(new Instant(4000)); + value2.add(new Instant(2000)); + + // Merging clears the old values and updates the result value. + StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1); + + // Merging clears the old values and updates the result value. + assertThat(value3.read(), Matchers.equalTo(new Instant(5000))); + assertThat(value1.read(), Matchers.equalTo(null)); + assertThat(value2.read(), Matchers.equalTo(null)); + } + + @Test + public void testSerialization() throws Exception { + ApexStateInternalsFactory<String> sif = new ApexStateBackend(). + newStateInternalsFactory(StringUtf8Coder.of()); + ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy"); + + ValueState<String> value = keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR); + assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value); + value.write("hello"); + + ApexStateInternalsFactory<String> cloned; + assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif)); + ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy"); + + ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR); + assertThat(clonedValue.read(), Matchers.equalTo("hello")); + assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value); + } + +}