Repository: flink Updated Branches: refs/heads/master 47faa90d6 -> e7586c3b2
[FLINK-3669] Timer coalescing across keys and cleanup of unused trigger tasks Per timestamp only one TriggerTask is registered at the runtime context. When the first timer is registered a new TriggerTask is sheduled. When no timer is registered anymore for a specific timestamp the corresponding trigger task is canceled and hence removed. The ScheduledFutures to cancel trigger tasks are not checkpointed. So cleanup of trigger tasks will not work after a failure. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7586c3b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7586c3b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7586c3b Branch: refs/heads/master Commit: e7586c3b2d995be164100919d7c04db003a71a90 Parents: 47faa90 Author: Konstantin Knauf <konstantin.kn...@tngtech.com> Authored: Tue Apr 5 19:59:19 2016 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Tue May 3 11:36:26 2016 +0200 ---------------------------------------------------------------------- .../kafka/testutils/MockRuntimeContext.java | 5 +- .../api/operators/AbstractStreamOperator.java | 5 +- .../api/operators/StreamingRuntimeContext.java | 5 +- .../operators/windowing/WindowOperator.java | 87 ++++++++++++++++---- .../streaming/runtime/tasks/StreamTask.java | 46 +++++------ .../operators/windowing/WindowOperatorTest.java | 75 +++++++++++++++++ 6 files changed, 177 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e7586c3b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java index 3e46503..1ac2ef5 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java @@ -45,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @SuppressWarnings("deprecation") @@ -187,14 +188,14 @@ public class MockRuntimeContext extends StreamingRuntimeContext { } @Override - public void registerTimer(final long time, final Triggerable target) { + public ScheduledFuture<?> registerTimer(final long time, final Triggerable target) { if (timer == null) { timer = Executors.newSingleThreadScheduledExecutor(); } final long delay = Math.max(time - System.currentTimeMillis(), 0); - timer.schedule(new Runnable() { + return timer.schedule(new Runnable() { @Override public void run() { synchronized (checkpointLock) { http://git-wip-us.apache.org/repos/asf/flink/blob/e7586c3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index a5d0ace..9673f87 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.concurrent.ScheduledFuture; /** * Base class for all stream operators. Operators that contain a user function should extend the class @@ -246,8 +247,8 @@ public abstract class AbstractStreamOperator<OUT> * @param time The absolute time in milliseconds. * @param target The target to be triggered. */ - protected void registerTimer(long time, Triggerable target) { - container.registerTimer(time, target); + protected ScheduledFuture<?> registerTimer(long time, Triggerable target) { + return container.registerTimer(time, target); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/e7586c3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index bd391cd..4500ee7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -39,6 +39,7 @@ import org.apache.flink.streaming.runtime.operators.Triggerable; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledFuture; import static java.util.Objects.requireNonNull; @@ -88,8 +89,8 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { * @param time The absolute time in milliseconds. * @param target The target to be triggered. */ - public void registerTimer(long time, Triggerable target) { - operator.registerTimer(time, target); + public ScheduledFuture<?> registerTimer(long time, Triggerable target) { + return operator.registerTimer(time, target); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/e7586c3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 919cee7..7b49e0b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -19,6 +19,8 @@ package org.apache.flink.streaming.runtime.operators.windowing; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.AppendingState; @@ -39,6 +41,7 @@ import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; @@ -65,6 +68,7 @@ import java.util.HashSet; import java.util.Map; import java.util.PriorityQueue; import java.util.Set; +import java.util.concurrent.ScheduledFuture; import static java.util.Objects.requireNonNull; @@ -134,6 +138,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> */ protected transient TimestampedCollector<OUT> timestampedCollector; + protected transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures; + /** * To keep track of the current watermark so that we can immediately fire if a trigger * registers an event time callback for a timestamp that lies in the past. @@ -149,8 +155,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> /** * Processing time timers that are currently in-flight. */ - protected transient Set<Timer<K, W>> processingTimeTimers; protected transient PriorityQueue<Timer<K, W>> processingTimeTimersQueue; + protected transient Set<Timer<K, W>> processingTimeTimers; + protected transient Multiset<Long> processingTimeTimerTimestamps; /** * Current waiting watermark callbacks. @@ -213,9 +220,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } if (processingTimeTimers == null) { processingTimeTimers = new HashSet<>(); + processingTimeTimerTimestamps = HashMultiset.create(); processingTimeTimersQueue = new PriorityQueue<>(100); } + //ScheduledFutures are not checkpointed + processingTimeTimerFutures = new HashMap<>(); + context = new Context(null, null); if (windowAssigner instanceof MergingWindowAssigner) { @@ -424,6 +435,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> public final void trigger(long time) throws Exception { boolean fire; + //Remove information about the triggering task + processingTimeTimerFutures.remove(time); + processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time)); + do { Timer<K, W> timer = processingTimeTimersQueue.peek(); if (timer != null && timer.timestamp <= time) { @@ -525,9 +540,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> @Override public void registerProcessingTimeTimer(long time) { Timer<K, W> timer = new Timer<>(time, key, window); + // make sure we only put one timer per key into the queue if (processingTimeTimers.add(timer)) { processingTimeTimersQueue.add(timer); - getRuntimeContext().registerTimer(time, WindowOperator.this); + //If this is the first timer added for this timestamp register a TriggerTask + if (processingTimeTimerTimestamps.add(time, 1) == 0) { + ScheduledFuture<?> scheduledFuture= getRuntimeContext().registerTimer(time, WindowOperator.this); + processingTimeTimerFutures.put(time, scheduledFuture); + } } } @@ -542,15 +562,25 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> // immediately schedule a trigger, so that we don't wait for the next // watermark update to fire the watermark trigger getRuntimeContext().registerTimer(System.currentTimeMillis(), WindowOperator.this); + //No need to put it in processingTimeTimerFutures as this timer is never removed } } @Override public void deleteProcessingTimeTimer(long time) { Timer<K, W> timer = new Timer<>(time, key, window); + if (processingTimeTimers.remove(timer)) { processingTimeTimersQueue.remove(timer); } + + //If there are no timers left for this timestamp, remove it from queue and cancel TriggerTask + if (processingTimeTimerTimestamps.remove(time,1) == 1) { + ScheduledFuture<?> triggerTaskFuture = processingTimeTimerFutures.remove(timer.timestamp); + if (triggerTaskFuture != null && !triggerTaskFuture.isDone()) { + triggerTaskFuture.cancel(false); + } + } } @Override @@ -592,6 +622,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } } + /** * Internal class for keeping track of in-flight timers. */ @@ -670,19 +701,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); - out.writeInt(watermarkTimersQueue.size()); - for (Timer<K, W> timer : watermarkTimersQueue) { - keySerializer.serialize(timer.key, out); - windowSerializer.serialize(timer.window, out); - out.writeLong(timer.timestamp); - } - - out.writeInt(processingTimeTimers.size()); - for (Timer<K, W> timer : processingTimeTimersQueue) { - keySerializer.serialize(timer.key, out); - windowSerializer.serialize(timer.window, out); - out.writeLong(timer.timestamp); - } + snapshotTimers(out); taskState.setOperatorState(out.closeAndGetHandle()); @@ -699,6 +718,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState(); DataInputView in = inputState.getState(userClassloader); + restoreTimers(in); + } + + private void restoreTimers(DataInputView in ) throws IOException { int numWatermarkTimers = in.readInt(); watermarkTimers = new HashSet<>(numWatermarkTimers); watermarkTimersQueue = new PriorityQueue<>(Math.max(numWatermarkTimers, 1)); @@ -712,15 +735,45 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } int numProcessingTimeTimers = in.readInt(); - processingTimeTimers = new HashSet<>(numProcessingTimeTimers); processingTimeTimersQueue = new PriorityQueue<>(Math.max(numProcessingTimeTimers, 1)); + processingTimeTimers = new HashSet<>(); for (int i = 0; i < numProcessingTimeTimers; i++) { K key = keySerializer.deserialize(in); W window = windowSerializer.deserialize(in); long timestamp = in.readLong(); Timer<K, W> timer = new Timer<>(timestamp, key, window); - processingTimeTimers.add(timer); processingTimeTimersQueue.add(timer); + processingTimeTimers.add(timer); + } + + int numProcessingTimeTimerTimestamp = in.readInt(); + processingTimeTimerTimestamps = HashMultiset.create(); + for (int i = 0; i< numProcessingTimeTimerTimestamp; i++) { + long timestamp = in.readLong(); + int count = in.readInt(); + processingTimeTimerTimestamps.add(timestamp, count); + } + } + + private void snapshotTimers(DataOutputView out) throws IOException { + out.writeInt(watermarkTimersQueue.size()); + for (Timer<K, W> timer : watermarkTimersQueue) { + keySerializer.serialize(timer.key, out); + windowSerializer.serialize(timer.window, out); + out.writeLong(timer.timestamp); + } + + out.writeInt(processingTimeTimers.size()); + for (Timer<K,W> timer : processingTimeTimers) { + keySerializer.serialize(timer.key, out); + windowSerializer.serialize(timer.window, out); + out.writeLong(timer.timestamp); + } + + out.writeInt(processingTimeTimerTimestamps.entrySet().size()); + for (Multiset.Entry<Long> timerTimestampCounts: processingTimeTimerTimestamps.entrySet()) { + out.writeLong(timerTimestampCounts.getElement()); + out.writeInt(timerTimestampCounts.getCount()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e7586c3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 7b154cd..51904b3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -17,16 +17,6 @@ package org.apache.flink.streaming.runtime.tasks; -import java.io.Serializable; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -41,23 +31,32 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot; import org.apache.flink.runtime.state.AsynchronousStateHandle; import org.apache.flink.runtime.state.KvStateSnapshot; +import org.apache.flink.runtime.state.StateBackendFactory; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.runtime.state.StateBackendFactory; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + /** * Base class for all streaming tasks. A task is the unit of local processing that is deployed * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form @@ -129,7 +128,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> private ClassLoader userClassLoader; /** The executor service that schedules and calls the triggers of this task*/ - private ScheduledExecutorService timerService; + private ScheduledThreadPoolExecutor timerService; /** The map of user-defined accumulators of this task */ private Map<String, Accumulator<?, ?>> accumulatorMap; @@ -191,8 +190,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> headOperator.setup(this, configuration, operatorChain.getChainEntryPoint()); } - timerService = Executors.newSingleThreadScheduledExecutor( - new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName())); + timerService =new ScheduledThreadPoolExecutor(1, new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName())); + // allow trigger tasks to be removed if all timers for that timestamp are removed by user + timerService.setRemoveOnCancelPolicy(true); // task specific initialization init(); @@ -663,13 +663,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> /** * Registers a timer. */ - public void registerTimer(final long timestamp, final Triggerable target) { + public ScheduledFuture<?> registerTimer(final long timestamp, final Triggerable target) { long delay = Math.max(timestamp - System.currentTimeMillis(), 0); - timerService.schedule( - new TriggerTask(this, lock, target, timestamp), - delay, - TimeUnit.MILLISECONDS); + return timerService.schedule( + new TriggerTask(this, lock, target, timestamp), + delay, + TimeUnit.MILLISECONDS); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/e7586c3b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 233131d..94b0e3c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -28,6 +28,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -58,6 +60,8 @@ import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -842,6 +846,77 @@ public class WindowOperatorTest { testHarness.close(); } + @Test + public void testRestoreAndSnapshotAreInSync() throws Exception { + + final int WINDOW_SIZE = 3; + final int WINDOW_SLIDE = 1; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), + EventTimeTrigger.create()); + + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + operator.setInputType(inputType, new ExecutionConfig()); + testHarness.open(); + + WindowOperator.Timer<String, TimeWindow> timer1 = new WindowOperator.Timer<>(1L, "key1", new TimeWindow(1L, 2L)); + WindowOperator.Timer<String, TimeWindow> timer2 = new WindowOperator.Timer<>(3L, "key1", new TimeWindow(1L, 2L)); + WindowOperator.Timer<String, TimeWindow> timer3 = new WindowOperator.Timer<>(2L, "key1", new TimeWindow(1L, 2L)); + operator.processingTimeTimers.add(timer1); + operator.processingTimeTimers.add(timer2); + operator.processingTimeTimers.add(timer3); + operator.processingTimeTimersQueue.add(timer1); + operator.processingTimeTimersQueue.add(timer2); + operator.processingTimeTimersQueue.add(timer3); + + operator.processingTimeTimerTimestamps.add(1L, 10); + operator.processingTimeTimerTimestamps.add(2L, 5); + operator.processingTimeTimerTimestamps.add(3L, 1); + + + StreamTaskState snapshot = testHarness.snapshot(0, 0); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> otherOperator = new WindowOperator<>( + SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), + EventTimeTrigger.create()); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> otherTestHarness = + new OneInputStreamOperatorTestHarness<>(otherOperator); + + otherTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + otherOperator.setInputType(inputType, new ExecutionConfig()); + + otherTestHarness.setup(); + otherTestHarness.restore(snapshot, 0); + otherTestHarness.open(); + + Assert.assertEquals(operator.processingTimeTimers, otherOperator.processingTimeTimers); + Assert.assertArrayEquals(operator.processingTimeTimersQueue.toArray(), otherOperator.processingTimeTimersQueue.toArray()); + Assert.assertEquals(operator.processingTimeTimerTimestamps, otherOperator.processingTimeTimerTimestamps); + } + // ------------------------------------------------------------------------ // UDFs // ------------------------------------------------------------------------