Repository: incubator-beam Updated Branches: refs/heads/master 7d1976b26 -> ffe3ab3d6
Revert "Move InMemoryTimerInternals to runners-core" This reverts commit ec0bf7b4023ff75f4ec6723d2e77ed507eb57c51. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/45ed5c70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/45ed5c70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/45ed5c70 Branch: refs/heads/master Commit: 45ed5c70c18a806d0fc2e7385886285206fd18e4 Parents: 954e57d Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Dec 16 16:33:51 2016 -0800 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Fri Dec 16 16:39:20 2016 -0800 ---------------------------------------------------------------------- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 1 + .../runners/core/InMemoryTimerInternals.java | 276 ------------------- .../core/InMemoryTimerInternalsTest.java | 155 ----------- .../beam/runners/core/ReduceFnTester.java | 1 + .../beam/runners/core/SplittableParDoTest.java | 16 +- .../triggers/TriggerStateMachineTester.java | 2 +- .../translation/SparkGroupAlsoByWindowFn.java | 2 +- .../apache/beam/sdk/transforms/DoFnTester.java | 36 +++ .../sdk/util/state/InMemoryTimerInternals.java | 275 ++++++++++++++++++ .../util/state/InMemoryTimerInternalsTest.java | 153 ++++++++++ 10 files changed, 471 insertions(+), 446 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java index efcd771..9189191 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java deleted file mode 100644 index b22fcb3..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.base.MoreObjects; -import java.util.HashSet; -import java.util.PriorityQueue; -import java.util.Set; -import javax.annotation.Nullable; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.WindowTracing; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.joda.time.Instant; - -/** - * Simulates the firing of timers and progression of input and output watermarks for a single - * computation and key in a Windmill-like streaming environment. - */ -public class InMemoryTimerInternals implements TimerInternals { - - /** At most one timer per timestamp is kept. */ - private Set<TimerData> existingTimers = new HashSet<>(); - - /** Pending input watermark timers, in timestamp order. */ - private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11); - - /** Pending processing time timers, in timestamp order. */ - private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11); - - /** Pending synchronized processing time timers, in timestamp order. */ - private PriorityQueue<TimerData> synchronizedProcessingTimers = new PriorityQueue<>(11); - - /** Current input watermark. */ - private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - - /** Current output watermark. */ - @Nullable private Instant outputWatermarkTime = null; - - /** Current processing time. */ - private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - - /** Current synchronized processing time. */ - private Instant synchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - - @Override - @Nullable - public Instant currentOutputWatermarkTime() { - return outputWatermarkTime; - } - - /** - * Returns when the next timer in the given time domain will fire, or {@code null} - * if there are no timers scheduled in that time domain. - */ - @Nullable - public Instant getNextTimer(TimeDomain domain) { - final TimerData data; - switch (domain) { - case EVENT_TIME: - data = watermarkTimers.peek(); - break; - case PROCESSING_TIME: - data = processingTimers.peek(); - break; - case SYNCHRONIZED_PROCESSING_TIME: - data = synchronizedProcessingTimers.peek(); - break; - default: - throw new IllegalArgumentException("Unexpected time domain: " + domain); - } - return (data == null) ? null : data.getTimestamp(); - } - - private PriorityQueue<TimerData> queue(TimeDomain domain) { - switch (domain) { - case EVENT_TIME: - return watermarkTimers; - case PROCESSING_TIME: - return processingTimers; - case SYNCHRONIZED_PROCESSING_TIME: - return synchronizedProcessingTimers; - default: - throw new IllegalArgumentException("Unexpected time domain: " + domain); - } - } - - @Override - public void setTimer(StateNamespace namespace, String timerId, Instant target, - TimeDomain timeDomain) { - throw new UnsupportedOperationException("Setting a timer by ID is not yet supported."); - } - - @Override - public void setTimer(TimerData timerData) { - WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData); - if (existingTimers.add(timerData)) { - queue(timerData.getDomain()).add(timerData); - } - } - - @Override - public void deleteTimer(StateNamespace namespace, String timerId) { - throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported."); - } - - @Override - public void deleteTimer(TimerData timer) { - WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer); - existingTimers.remove(timer); - queue(timer.getDomain()).remove(timer); - } - - @Override - public Instant currentProcessingTime() { - return processingTime; - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return synchronizedProcessingTime; - } - - @Override - public Instant currentInputWatermarkTime() { - return inputWatermarkTime; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("watermarkTimers", watermarkTimers) - .add("processingTimers", processingTimers) - .add("synchronizedProcessingTimers", synchronizedProcessingTimers) - .add("inputWatermarkTime", inputWatermarkTime) - .add("outputWatermarkTime", outputWatermarkTime) - .add("processingTime", processingTime) - .toString(); - } - - /** Advances input watermark to the given value. */ - public void advanceInputWatermark(Instant newInputWatermark) throws Exception { - checkNotNull(newInputWatermark); - checkState( - !newInputWatermark.isBefore(inputWatermarkTime), - "Cannot move input watermark time backwards from %s to %s", - inputWatermarkTime, - newInputWatermark); - WindowTracing.trace( - "{}.advanceInputWatermark: from {} to {}", - getClass().getSimpleName(), inputWatermarkTime, newInputWatermark); - inputWatermarkTime = newInputWatermark; - } - - /** Advances output watermark to the given value. */ - public void advanceOutputWatermark(Instant newOutputWatermark) { - checkNotNull(newOutputWatermark); - final Instant adjustedOutputWatermark; - if (newOutputWatermark.isAfter(inputWatermarkTime)) { - WindowTracing.trace( - "{}.advanceOutputWatermark: clipping output watermark from {} to {}", - getClass().getSimpleName(), newOutputWatermark, inputWatermarkTime); - adjustedOutputWatermark = inputWatermarkTime; - } else { - adjustedOutputWatermark = newOutputWatermark; - } - - checkState( - outputWatermarkTime == null || !adjustedOutputWatermark.isBefore(outputWatermarkTime), - "Cannot move output watermark time backwards from %s to %s", - outputWatermarkTime, - adjustedOutputWatermark); - WindowTracing.trace( - "{}.advanceOutputWatermark: from {} to {}", - getClass().getSimpleName(), outputWatermarkTime, adjustedOutputWatermark); - outputWatermarkTime = adjustedOutputWatermark; - } - - /** Advances processing time to the given value. */ - public void advanceProcessingTime(Instant newProcessingTime) throws Exception { - checkNotNull(newProcessingTime); - checkState( - !newProcessingTime.isBefore(processingTime), - "Cannot move processing time backwards from %s to %s", - processingTime, - newProcessingTime); - WindowTracing.trace( - "{}.advanceProcessingTime: from {} to {}", - getClass().getSimpleName(), processingTime, newProcessingTime); - processingTime = newProcessingTime; - } - - /** Advances synchronized processing time to the given value. */ - public void advanceSynchronizedProcessingTime(Instant newSynchronizedProcessingTime) - throws Exception { - checkNotNull(newSynchronizedProcessingTime); - checkState( - !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime), - "Cannot move processing time backwards from %s to %s", - synchronizedProcessingTime, - newSynchronizedProcessingTime); - WindowTracing.trace( - "{}.advanceProcessingTime: from {} to {}", - getClass().getSimpleName(), synchronizedProcessingTime, newSynchronizedProcessingTime); - synchronizedProcessingTime = newSynchronizedProcessingTime; - } - - /** Returns the next eligible event time timer, if none returns null. */ - @Nullable - public TimerData removeNextEventTimer() { - TimerData timer = removeNextTimer(inputWatermarkTime, TimeDomain.EVENT_TIME); - if (timer != null) { - WindowTracing.trace( - "{}.removeNextEventTimer: firing {} at {}", - getClass().getSimpleName(), timer, inputWatermarkTime); - } - return timer; - } - - /** Returns the next eligible processing time timer, if none returns null. */ - @Nullable - public TimerData removeNextProcessingTimer() { - TimerData timer = removeNextTimer(processingTime, TimeDomain.PROCESSING_TIME); - if (timer != null) { - WindowTracing.trace( - "{}.removeNextProcessingTimer: firing {} at {}", - getClass().getSimpleName(), timer, processingTime); - } - return timer; - } - - /** Returns the next eligible synchronized processing time timer, if none returns null. */ - @Nullable - public TimerData removeNextSynchronizedProcessingTimer() { - TimerData timer = removeNextTimer( - synchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - if (timer != null) { - WindowTracing.trace( - "{}.removeNextSynchronizedProcessingTimer: firing {} at {}", - getClass().getSimpleName(), timer, synchronizedProcessingTime); - } - return timer; - } - - @Nullable - private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) { - PriorityQueue<TimerData> queue = queue(domain); - if (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) { - TimerData timer = queue.remove(); - existingTimers.remove(timer); - return timer; - } else { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java deleted file mode 100644 index 2caa874..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaceForTest; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link InMemoryTimerInternals}. - */ -@RunWith(JUnit4.class) -public class InMemoryTimerInternalsTest { - - private static final StateNamespace NS1 = new StateNamespaceForTest("NS1"); - - @Test - public void testFiringTimers() throws Exception { - InMemoryTimerInternals underTest = new InMemoryTimerInternals(); - TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); - TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); - - underTest.setTimer(processingTime1); - underTest.setTimer(processingTime2); - - underTest.advanceProcessingTime(new Instant(20)); - assertEquals(processingTime1, underTest.removeNextProcessingTimer()); - assertNull(underTest.removeNextProcessingTimer()); - - // Advancing just a little shouldn't refire - underTest.advanceProcessingTime(new Instant(21)); - assertNull(underTest.removeNextProcessingTimer()); - - // Adding the timer and advancing a little should refire - underTest.setTimer(processingTime1); - assertEquals(processingTime1, underTest.removeNextProcessingTimer()); - assertNull(underTest.removeNextProcessingTimer()); - - // And advancing the rest of the way should still have the other timer - underTest.advanceProcessingTime(new Instant(30)); - assertEquals(processingTime2, underTest.removeNextProcessingTimer()); - assertNull(underTest.removeNextProcessingTimer()); - } - - @Test - public void testFiringTimersWithCallback() throws Exception { - InMemoryTimerInternals underTest = new InMemoryTimerInternals(); - TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); - TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); - - underTest.setTimer(processingTime1); - underTest.setTimer(processingTime2); - - underTest.advanceProcessingTime(new Instant(20)); - assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1)); - assertThat(underTest.removeNextProcessingTimer(), nullValue()); - - // Advancing just a little shouldn't refire - underTest.advanceProcessingTime(new Instant(21)); - assertThat(underTest.removeNextProcessingTimer(), nullValue()); - - // Adding the timer and advancing a little should fire again - underTest.setTimer(processingTime1); - underTest.advanceProcessingTime(new Instant(21)); - assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1)); - assertThat(underTest.removeNextProcessingTimer(), nullValue()); - - // And advancing the rest of the way should still have the other timer - underTest.advanceProcessingTime(new Instant(30)); - assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime2)); - assertThat(underTest.removeNextProcessingTimer(), nullValue()); - } - - @Test - public void testTimerOrdering() throws Exception { - InMemoryTimerInternals underTest = new InMemoryTimerInternals(); - TimerData eventTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME); - TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); - TimerData synchronizedProcessingTime1 = TimerData.of( - NS1, new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - TimerData eventTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME); - TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); - TimerData synchronizedProcessingTime2 = TimerData.of( - NS1, new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - - underTest.setTimer(processingTime1); - underTest.setTimer(eventTime1); - underTest.setTimer(synchronizedProcessingTime1); - underTest.setTimer(processingTime2); - underTest.setTimer(eventTime2); - underTest.setTimer(synchronizedProcessingTime2); - - assertNull(underTest.removeNextEventTimer()); - underTest.advanceInputWatermark(new Instant(30)); - assertEquals(eventTime1, underTest.removeNextEventTimer()); - assertEquals(eventTime2, underTest.removeNextEventTimer()); - assertNull(underTest.removeNextEventTimer()); - - assertNull(underTest.removeNextProcessingTimer()); - underTest.advanceProcessingTime(new Instant(30)); - assertEquals(processingTime1, underTest.removeNextProcessingTimer()); - assertEquals(processingTime2, underTest.removeNextProcessingTimer()); - assertNull(underTest.removeNextProcessingTimer()); - - assertNull(underTest.removeNextSynchronizedProcessingTimer()); - underTest.advanceSynchronizedProcessingTime(new Instant(30)); - assertEquals(synchronizedProcessingTime1, underTest.removeNextSynchronizedProcessingTimer()); - assertEquals(synchronizedProcessingTime2, underTest.removeNextSynchronizedProcessingTimer()); - assertNull(underTest.removeNextProcessingTimer()); - } - - @Test - public void testDeduplicate() throws Exception { - InMemoryTimerInternals underTest = new InMemoryTimerInternals(); - TimerData eventTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME); - TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); - underTest.setTimer(eventTime); - underTest.setTimer(eventTime); - underTest.setTimer(processingTime); - underTest.setTimer(processingTime); - underTest.advanceProcessingTime(new Instant(20)); - underTest.advanceInputWatermark(new Instant(20)); - - assertEquals(processingTime, underTest.removeNextProcessingTimer()); - assertNull(underTest.removeNextProcessingTimer()); - assertEquals(eventTime, underTest.removeNextEventTimer()); - assertNull(underTest.removeNextEventTimer()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 890195a..db0cf91 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -73,6 +73,7 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index 41d419b..cf96b66 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -28,7 +28,6 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import java.io.Serializable; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -191,8 +190,6 @@ public class SplittableParDoTest { tester; private Instant currentProcessingTime; - private InMemoryTimerInternals timerInternals; - ProcessFnTester( Instant currentProcessingTime, DoFn<InputT, OutputT> fn, @@ -203,7 +200,6 @@ public class SplittableParDoTest { new SplittableParDo.ProcessFn<>( fn, inputCoder, restrictionCoder, IntervalWindow.getCoder()); this.tester = DoFnTester.of(processFn); - this.timerInternals = new InMemoryTimerInternals(); processFn.setStateInternalsFactory( new StateInternalsFactory<String>() { @Override @@ -215,7 +211,7 @@ public class SplittableParDoTest { new TimerInternalsFactory<String>() { @Override public TimerInternals timerInternalsForKey(String key) { - return timerInternals; + return tester.getTimerInternals(); } }); processFn.setOutputWindowedValue( @@ -251,7 +247,7 @@ public class SplittableParDoTest { // through the state/timer/output callbacks. this.tester.setCloningBehavior(DoFnTester.CloningBehavior.DO_NOT_CLONE); this.tester.startBundle(); - timerInternals.advanceProcessingTime(currentProcessingTime); + this.tester.advanceProcessingTime(currentProcessingTime); this.currentProcessingTime = currentProcessingTime; } @@ -289,13 +285,7 @@ public class SplittableParDoTest { */ boolean advanceProcessingTimeBy(Duration duration) throws Exception { currentProcessingTime = currentProcessingTime.plus(duration); - timerInternals.advanceProcessingTime(currentProcessingTime); - - List<TimerInternals.TimerData> timers = new ArrayList<>(); - TimerInternals.TimerData nextTimer; - while ((nextTimer = timerInternals.removeNextProcessingTimer()) != null) { - timers.add(nextTimer); - } + List<TimerInternals.TimerData> timers = tester.advanceProcessingTime(currentProcessingTime); if (timers.isEmpty()) { return false; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java index 2a626d4..be63c06 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java @@ -34,7 +34,6 @@ import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.runners.core.ActiveWindowSet; import org.apache.beam.runners.core.ActiveWindowSet.MergeCallback; -import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.MergingActiveWindowSet; import org.apache.beam.runners.core.NonMergingActiveWindowSet; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -47,6 +46,7 @@ import org.apache.beam.sdk.util.Timers; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java index 5432d58..87d3f50 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; -import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SystemReduceFn; @@ -38,6 +37,7 @@ import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 2d8684a..93b3f59 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -46,10 +46,12 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.InMemoryStateInternals; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; @@ -141,6 +143,10 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { return (StateInternals<K>) stateInternals; } + public TimerInternals getTimerInternals() { + return timerInternals; + } + /** * When a {@link DoFnTester} should clone the {@link DoFn} under test and how it should manage * the lifecycle of the {@link DoFn}. @@ -227,6 +233,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { context.setupDelegateAggregators(); // State and timer internals are per-bundle. stateInternals = InMemoryStateInternals.forKey(new Object()); + timerInternals = new InMemoryTimerInternals(); try { fnInvoker.invokeStartBundle(context); } catch (UserCodeException e) { @@ -535,6 +542,34 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { return extractAggregatorValue(agg.getName(), agg.getCombineFn()); } + public List<TimerInternals.TimerData> advanceInputWatermark(Instant newWatermark) { + try { + timerInternals.advanceInputWatermark(newWatermark); + final List<TimerInternals.TimerData> firedTimers = new ArrayList<>(); + TimerInternals.TimerData timer; + while ((timer = timerInternals.removeNextEventTimer()) != null) { + firedTimers.add(timer); + } + return firedTimers; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public List<TimerInternals.TimerData> advanceProcessingTime(Instant newProcessingTime) { + try { + timerInternals.advanceProcessingTime(newProcessingTime); + final List<TimerInternals.TimerData> firedTimers = new ArrayList<>(); + TimerInternals.TimerData timer; + while ((timer = timerInternals.removeNextProcessingTimer()) != null) { + firedTimers.add(timer); + } + return firedTimers; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + private <AccumT, AggregateT> AggregateT extractAggregatorValue( String name, CombineFn<?, AccumT, AggregateT> combiner) { @SuppressWarnings("unchecked") @@ -779,6 +814,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs; private InMemoryStateInternals<?> stateInternals; + private InMemoryTimerInternals timerInternals; /** The state of processing of the {@link DoFn} under test. */ private State state = State.UNINITIALIZED; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java new file mode 100644 index 0000000..44b44f0 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.MoreObjects; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowTracing; +import org.joda.time.Instant; + +/** + * Simulates the firing of timers and progression of input and output watermarks for a single + * computation and key in a Windmill-like streaming environment. + */ +public class InMemoryTimerInternals implements TimerInternals { + + /** At most one timer per timestamp is kept. */ + private Set<TimerData> existingTimers = new HashSet<>(); + + /** Pending input watermark timers, in timestamp order. */ + private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11); + + /** Pending processing time timers, in timestamp order. */ + private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11); + + /** Pending synchronized processing time timers, in timestamp order. */ + private PriorityQueue<TimerData> synchronizedProcessingTimers = new PriorityQueue<>(11); + + /** Current input watermark. */ + private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + /** Current output watermark. */ + @Nullable private Instant outputWatermarkTime = null; + + /** Current processing time. */ + private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + /** Current synchronized processing time. */ + private Instant synchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + @Override + @Nullable + public Instant currentOutputWatermarkTime() { + return outputWatermarkTime; + } + + /** + * Returns when the next timer in the given time domain will fire, or {@code null} + * if there are no timers scheduled in that time domain. + */ + @Nullable + public Instant getNextTimer(TimeDomain domain) { + final TimerData data; + switch (domain) { + case EVENT_TIME: + data = watermarkTimers.peek(); + break; + case PROCESSING_TIME: + data = processingTimers.peek(); + break; + case SYNCHRONIZED_PROCESSING_TIME: + data = synchronizedProcessingTimers.peek(); + break; + default: + throw new IllegalArgumentException("Unexpected time domain: " + domain); + } + return (data == null) ? null : data.getTimestamp(); + } + + private PriorityQueue<TimerData> queue(TimeDomain domain) { + switch (domain) { + case EVENT_TIME: + return watermarkTimers; + case PROCESSING_TIME: + return processingTimers; + case SYNCHRONIZED_PROCESSING_TIME: + return synchronizedProcessingTimers; + default: + throw new IllegalArgumentException("Unexpected time domain: " + domain); + } + } + + @Override + public void setTimer(StateNamespace namespace, String timerId, Instant target, + TimeDomain timeDomain) { + throw new UnsupportedOperationException("Setting a timer by ID is not yet supported."); + } + + @Override + public void setTimer(TimerData timerData) { + WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData); + if (existingTimers.add(timerData)) { + queue(timerData.getDomain()).add(timerData); + } + } + + @Override + public void deleteTimer(StateNamespace namespace, String timerId) { + throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported."); + } + + @Override + public void deleteTimer(TimerData timer) { + WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer); + existingTimers.remove(timer); + queue(timer.getDomain()).remove(timer); + } + + @Override + public Instant currentProcessingTime() { + return processingTime; + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return synchronizedProcessingTime; + } + + @Override + public Instant currentInputWatermarkTime() { + return inputWatermarkTime; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("watermarkTimers", watermarkTimers) + .add("processingTimers", processingTimers) + .add("synchronizedProcessingTimers", synchronizedProcessingTimers) + .add("inputWatermarkTime", inputWatermarkTime) + .add("outputWatermarkTime", outputWatermarkTime) + .add("processingTime", processingTime) + .toString(); + } + + /** Advances input watermark to the given value. */ + public void advanceInputWatermark(Instant newInputWatermark) throws Exception { + checkNotNull(newInputWatermark); + checkState( + !newInputWatermark.isBefore(inputWatermarkTime), + "Cannot move input watermark time backwards from %s to %s", + inputWatermarkTime, + newInputWatermark); + WindowTracing.trace( + "{}.advanceInputWatermark: from {} to {}", + getClass().getSimpleName(), inputWatermarkTime, newInputWatermark); + inputWatermarkTime = newInputWatermark; + } + + /** Advances output watermark to the given value. */ + public void advanceOutputWatermark(Instant newOutputWatermark) { + checkNotNull(newOutputWatermark); + final Instant adjustedOutputWatermark; + if (newOutputWatermark.isAfter(inputWatermarkTime)) { + WindowTracing.trace( + "{}.advanceOutputWatermark: clipping output watermark from {} to {}", + getClass().getSimpleName(), newOutputWatermark, inputWatermarkTime); + adjustedOutputWatermark = inputWatermarkTime; + } else { + adjustedOutputWatermark = newOutputWatermark; + } + + checkState( + outputWatermarkTime == null || !adjustedOutputWatermark.isBefore(outputWatermarkTime), + "Cannot move output watermark time backwards from %s to %s", + outputWatermarkTime, + adjustedOutputWatermark); + WindowTracing.trace( + "{}.advanceOutputWatermark: from {} to {}", + getClass().getSimpleName(), outputWatermarkTime, adjustedOutputWatermark); + outputWatermarkTime = adjustedOutputWatermark; + } + + /** Advances processing time to the given value. */ + public void advanceProcessingTime(Instant newProcessingTime) throws Exception { + checkNotNull(newProcessingTime); + checkState( + !newProcessingTime.isBefore(processingTime), + "Cannot move processing time backwards from %s to %s", + processingTime, + newProcessingTime); + WindowTracing.trace( + "{}.advanceProcessingTime: from {} to {}", + getClass().getSimpleName(), processingTime, newProcessingTime); + processingTime = newProcessingTime; + } + + /** Advances synchronized processing time to the given value. */ + public void advanceSynchronizedProcessingTime(Instant newSynchronizedProcessingTime) + throws Exception { + checkNotNull(newSynchronizedProcessingTime); + checkState( + !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime), + "Cannot move processing time backwards from %s to %s", + synchronizedProcessingTime, + newSynchronizedProcessingTime); + WindowTracing.trace( + "{}.advanceProcessingTime: from {} to {}", + getClass().getSimpleName(), synchronizedProcessingTime, newSynchronizedProcessingTime); + synchronizedProcessingTime = newSynchronizedProcessingTime; + } + + /** Returns the next eligible event time timer, if none returns null. */ + @Nullable + public TimerData removeNextEventTimer() { + TimerData timer = removeNextTimer(inputWatermarkTime, TimeDomain.EVENT_TIME); + if (timer != null) { + WindowTracing.trace( + "{}.removeNextEventTimer: firing {} at {}", + getClass().getSimpleName(), timer, inputWatermarkTime); + } + return timer; + } + + /** Returns the next eligible processing time timer, if none returns null. */ + @Nullable + public TimerData removeNextProcessingTimer() { + TimerData timer = removeNextTimer(processingTime, TimeDomain.PROCESSING_TIME); + if (timer != null) { + WindowTracing.trace( + "{}.removeNextProcessingTimer: firing {} at {}", + getClass().getSimpleName(), timer, processingTime); + } + return timer; + } + + /** Returns the next eligible synchronized processing time timer, if none returns null. */ + @Nullable + public TimerData removeNextSynchronizedProcessingTimer() { + TimerData timer = removeNextTimer( + synchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + if (timer != null) { + WindowTracing.trace( + "{}.removeNextSynchronizedProcessingTimer: firing {} at {}", + getClass().getSimpleName(), timer, synchronizedProcessingTime); + } + return timer; + } + + @Nullable + private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) { + PriorityQueue<TimerData> queue = queue(domain); + if (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) { + TimerData timer = queue.remove(); + existingTimers.remove(timer); + return timer; + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java new file mode 100644 index 0000000..4a2763c --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link InMemoryTimerInternals}. + */ +@RunWith(JUnit4.class) +public class InMemoryTimerInternalsTest { + + private static final StateNamespace NS1 = new StateNamespaceForTest("NS1"); + + @Test + public void testFiringTimers() throws Exception { + InMemoryTimerInternals underTest = new InMemoryTimerInternals(); + TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); + TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); + + underTest.setTimer(processingTime1); + underTest.setTimer(processingTime2); + + underTest.advanceProcessingTime(new Instant(20)); + assertEquals(processingTime1, underTest.removeNextProcessingTimer()); + assertNull(underTest.removeNextProcessingTimer()); + + // Advancing just a little shouldn't refire + underTest.advanceProcessingTime(new Instant(21)); + assertNull(underTest.removeNextProcessingTimer()); + + // Adding the timer and advancing a little should refire + underTest.setTimer(processingTime1); + assertEquals(processingTime1, underTest.removeNextProcessingTimer()); + assertNull(underTest.removeNextProcessingTimer()); + + // And advancing the rest of the way should still have the other timer + underTest.advanceProcessingTime(new Instant(30)); + assertEquals(processingTime2, underTest.removeNextProcessingTimer()); + assertNull(underTest.removeNextProcessingTimer()); + } + + @Test + public void testFiringTimersWithCallback() throws Exception { + InMemoryTimerInternals underTest = new InMemoryTimerInternals(); + TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); + TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); + + underTest.setTimer(processingTime1); + underTest.setTimer(processingTime2); + + underTest.advanceProcessingTime(new Instant(20)); + assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1)); + assertThat(underTest.removeNextProcessingTimer(), nullValue()); + + // Advancing just a little shouldn't refire + underTest.advanceProcessingTime(new Instant(21)); + assertThat(underTest.removeNextProcessingTimer(), nullValue()); + + // Adding the timer and advancing a little should fire again + underTest.setTimer(processingTime1); + underTest.advanceProcessingTime(new Instant(21)); + assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1)); + assertThat(underTest.removeNextProcessingTimer(), nullValue()); + + // And advancing the rest of the way should still have the other timer + underTest.advanceProcessingTime(new Instant(30)); + assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime2)); + assertThat(underTest.removeNextProcessingTimer(), nullValue()); + } + + @Test + public void testTimerOrdering() throws Exception { + InMemoryTimerInternals underTest = new InMemoryTimerInternals(); + TimerData eventTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME); + TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); + TimerData synchronizedProcessingTime1 = TimerData.of( + NS1, new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + TimerData eventTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME); + TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); + TimerData synchronizedProcessingTime2 = TimerData.of( + NS1, new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + + underTest.setTimer(processingTime1); + underTest.setTimer(eventTime1); + underTest.setTimer(synchronizedProcessingTime1); + underTest.setTimer(processingTime2); + underTest.setTimer(eventTime2); + underTest.setTimer(synchronizedProcessingTime2); + + assertNull(underTest.removeNextEventTimer()); + underTest.advanceInputWatermark(new Instant(30)); + assertEquals(eventTime1, underTest.removeNextEventTimer()); + assertEquals(eventTime2, underTest.removeNextEventTimer()); + assertNull(underTest.removeNextEventTimer()); + + assertNull(underTest.removeNextProcessingTimer()); + underTest.advanceProcessingTime(new Instant(30)); + assertEquals(processingTime1, underTest.removeNextProcessingTimer()); + assertEquals(processingTime2, underTest.removeNextProcessingTimer()); + assertNull(underTest.removeNextProcessingTimer()); + + assertNull(underTest.removeNextSynchronizedProcessingTimer()); + underTest.advanceSynchronizedProcessingTime(new Instant(30)); + assertEquals(synchronizedProcessingTime1, underTest.removeNextSynchronizedProcessingTimer()); + assertEquals(synchronizedProcessingTime2, underTest.removeNextSynchronizedProcessingTimer()); + assertNull(underTest.removeNextProcessingTimer()); + } + + @Test + public void testDeduplicate() throws Exception { + InMemoryTimerInternals underTest = new InMemoryTimerInternals(); + TimerData eventTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME); + TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); + underTest.setTimer(eventTime); + underTest.setTimer(eventTime); + underTest.setTimer(processingTime); + underTest.setTimer(processingTime); + underTest.advanceProcessingTime(new Instant(20)); + underTest.advanceInputWatermark(new Instant(20)); + + assertEquals(processingTime, underTest.removeNextProcessingTimer()); + assertNull(underTest.removeNextProcessingTimer()); + assertEquals(eventTime, underTest.removeNextEventTimer()); + assertNull(underTest.removeNextEventTimer()); + } +}