Repository: flink
Updated Branches:
  refs/heads/master 47faa90d6 -> e7586c3b2


[FLINK-3669] Timer coalescing across keys and cleanup of unused trigger tasks

Per timestamp only one TriggerTask is registered at
the runtime context. When the first timer is registered a new TriggerTask
is sheduled. When no timer is registered anymore for a specific timestamp
the corresponding trigger task is canceled and hence removed.

The ScheduledFutures to cancel trigger tasks are not checkpointed. So
cleanup of trigger tasks will not work after a failure.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7586c3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7586c3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7586c3b

Branch: refs/heads/master
Commit: e7586c3b2d995be164100919d7c04db003a71a90
Parents: 47faa90
Author: Konstantin Knauf <konstantin.kn...@tngtech.com>
Authored: Tue Apr 5 19:59:19 2016 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Tue May 3 11:36:26 2016 +0200

----------------------------------------------------------------------
 .../kafka/testutils/MockRuntimeContext.java     |  5 +-
 .../api/operators/AbstractStreamOperator.java   |  5 +-
 .../api/operators/StreamingRuntimeContext.java  |  5 +-
 .../operators/windowing/WindowOperator.java     | 87 ++++++++++++++++----
 .../streaming/runtime/tasks/StreamTask.java     | 46 +++++------
 .../operators/windowing/WindowOperatorTest.java | 75 +++++++++++++++++
 6 files changed, 177 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e7586c3b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index 3e46503..1ac2ef5 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -45,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 @SuppressWarnings("deprecation")
@@ -187,14 +188,14 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
        }
        
        @Override
-       public void registerTimer(final long time, final Triggerable target) {
+       public ScheduledFuture<?> registerTimer(final long time, final 
Triggerable target) {
                if (timer == null) {
                        timer = Executors.newSingleThreadScheduledExecutor();
                }
                
                final long delay = Math.max(time - System.currentTimeMillis(), 
0);
 
-               timer.schedule(new Runnable() {
+               return timer.schedule(new Runnable() {
                        @Override
                        public void run() {
                                synchronized (checkpointLock) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e7586c3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a5d0ace..9673f87 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.concurrent.ScheduledFuture;
 
 /**
  * Base class for all stream operators. Operators that contain a user function 
should extend the class 
@@ -246,8 +247,8 @@ public abstract class AbstractStreamOperator<OUT>
         * @param time The absolute time in milliseconds.
         * @param target The target to be triggered.
         */
-       protected void registerTimer(long time, Triggerable target) {
-               container.registerTimer(time, target);
+       protected ScheduledFuture<?> registerTimer(long time, Triggerable 
target) {
+               return container.registerTimer(time, target);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e7586c3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index bd391cd..4500ee7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.streaming.runtime.operators.Triggerable;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
 
 import static java.util.Objects.requireNonNull;
 
@@ -88,8 +89,8 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
         * @param time The absolute time in milliseconds.
         * @param target The target to be triggered.
         */
-       public void registerTimer(long time, Triggerable target) {
-               operator.registerTimer(time, target);
+       public ScheduledFuture<?> registerTimer(long time, Triggerable target) {
+               return operator.registerTimer(time, target);
        }
        
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e7586c3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 919cee7..7b49e0b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -19,6 +19,8 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.AppendingState;
@@ -39,6 +41,7 @@ import 
org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
@@ -65,6 +68,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
 
 import static java.util.Objects.requireNonNull;
 
@@ -134,6 +138,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
         */
        protected transient TimestampedCollector<OUT> timestampedCollector;
 
+       protected transient Map<Long, ScheduledFuture<?>> 
processingTimeTimerFutures;
+
        /**
         * To keep track of the current watermark so that we can immediately 
fire if a trigger
         * registers an event time callback for a timestamp that lies in the 
past.
@@ -149,8 +155,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        /**
         * Processing time timers that are currently in-flight.
         */
-       protected transient Set<Timer<K, W>> processingTimeTimers;
        protected transient PriorityQueue<Timer<K, W>> 
processingTimeTimersQueue;
+       protected transient Set<Timer<K, W>> processingTimeTimers;
+       protected transient Multiset<Long> processingTimeTimerTimestamps;
 
        /**
         * Current waiting watermark callbacks.
@@ -213,9 +220,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                }
                if (processingTimeTimers == null) {
                        processingTimeTimers = new HashSet<>();
+                       processingTimeTimerTimestamps = HashMultiset.create();
                        processingTimeTimersQueue = new PriorityQueue<>(100);
                }
 
+               //ScheduledFutures are not checkpointed
+               processingTimeTimerFutures = new HashMap<>();
+
                context = new Context(null, null);
 
                if (windowAssigner instanceof MergingWindowAssigner) {
@@ -424,6 +435,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        public final void trigger(long time) throws Exception {
                boolean fire;
 
+               //Remove information about the triggering task
+               processingTimeTimerFutures.remove(time);
+               processingTimeTimerTimestamps.remove(time, 
processingTimeTimerTimestamps.count(time));
+
                do {
                        Timer<K, W> timer = processingTimeTimersQueue.peek();
                        if (timer != null && timer.timestamp <= time) {
@@ -525,9 +540,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                @Override
                public void registerProcessingTimeTimer(long time) {
                        Timer<K, W> timer = new Timer<>(time, key, window);
+                       // make sure we only put one timer per key into the 
queue
                        if (processingTimeTimers.add(timer)) {
                                processingTimeTimersQueue.add(timer);
-                               getRuntimeContext().registerTimer(time, 
WindowOperator.this);
+                               //If this is the first timer added for this 
timestamp register a TriggerTask
+                               if (processingTimeTimerTimestamps.add(time, 1) 
== 0) {
+                                       ScheduledFuture<?> scheduledFuture= 
getRuntimeContext().registerTimer(time, WindowOperator.this);
+                                       processingTimeTimerFutures.put(time, 
scheduledFuture);
+                               }
                        }
                }
 
@@ -542,15 +562,25 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                // immediately schedule a trigger, so that we 
don't wait for the next
                                // watermark update to fire the watermark 
trigger
                                
getRuntimeContext().registerTimer(System.currentTimeMillis(), 
WindowOperator.this);
+                               //No need to put it in 
processingTimeTimerFutures as this timer is never removed
                        }
                }
 
                @Override
                public void deleteProcessingTimeTimer(long time) {
                        Timer<K, W> timer = new Timer<>(time, key, window);
+
                        if (processingTimeTimers.remove(timer)) {
                                processingTimeTimersQueue.remove(timer);
                        }
+
+                       //If there are no timers left for this timestamp, 
remove it from queue and cancel TriggerTask
+                       if (processingTimeTimerTimestamps.remove(time,1) == 1) {
+                               ScheduledFuture<?> triggerTaskFuture = 
processingTimeTimerFutures.remove(timer.timestamp);
+                               if (triggerTaskFuture != null && 
!triggerTaskFuture.isDone()) {
+                                       triggerTaskFuture.cancel(false);
+                               }
+                       }
                }
 
                @Override
@@ -592,6 +622,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                }
        }
 
+
        /**
         * Internal class for keeping track of in-flight timers.
         */
@@ -670,19 +701,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                AbstractStateBackend.CheckpointStateOutputView out =
                        
getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
 
-               out.writeInt(watermarkTimersQueue.size());
-               for (Timer<K, W> timer : watermarkTimersQueue) {
-                       keySerializer.serialize(timer.key, out);
-                       windowSerializer.serialize(timer.window, out);
-                       out.writeLong(timer.timestamp);
-               }
-
-               out.writeInt(processingTimeTimers.size());
-               for (Timer<K, W> timer : processingTimeTimersQueue) {
-                       keySerializer.serialize(timer.key, out);
-                       windowSerializer.serialize(timer.window, out);
-                       out.writeLong(timer.timestamp);
-               }
+               snapshotTimers(out);
 
                taskState.setOperatorState(out.closeAndGetHandle());
 
@@ -699,6 +718,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                StateHandle<DataInputView> inputState = 
(StateHandle<DataInputView>) taskState.getOperatorState();
                DataInputView in = inputState.getState(userClassloader);
 
+               restoreTimers(in);
+       }
+
+       private void restoreTimers(DataInputView in ) throws IOException {
                int numWatermarkTimers = in.readInt();
                watermarkTimers = new HashSet<>(numWatermarkTimers);
                watermarkTimersQueue = new 
PriorityQueue<>(Math.max(numWatermarkTimers, 1));
@@ -712,15 +735,45 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                }
 
                int numProcessingTimeTimers = in.readInt();
-               processingTimeTimers = new HashSet<>(numProcessingTimeTimers);
                processingTimeTimersQueue = new 
PriorityQueue<>(Math.max(numProcessingTimeTimers, 1));
+               processingTimeTimers = new HashSet<>();
                for (int i = 0; i < numProcessingTimeTimers; i++) {
                        K key = keySerializer.deserialize(in);
                        W window = windowSerializer.deserialize(in);
                        long timestamp = in.readLong();
                        Timer<K, W> timer = new Timer<>(timestamp, key, window);
-                       processingTimeTimers.add(timer);
                        processingTimeTimersQueue.add(timer);
+                       processingTimeTimers.add(timer);
+               }
+
+               int numProcessingTimeTimerTimestamp = in.readInt();
+               processingTimeTimerTimestamps = HashMultiset.create();
+               for (int i = 0; i< numProcessingTimeTimerTimestamp; i++) {
+                       long timestamp = in.readLong();
+                       int count = in.readInt();
+                       processingTimeTimerTimestamps.add(timestamp, count);
+               }
+       }
+
+       private void snapshotTimers(DataOutputView out) throws IOException {
+               out.writeInt(watermarkTimersQueue.size());
+               for (Timer<K, W> timer : watermarkTimersQueue) {
+                       keySerializer.serialize(timer.key, out);
+                       windowSerializer.serialize(timer.window, out);
+                       out.writeLong(timer.timestamp);
+               }
+
+               out.writeInt(processingTimeTimers.size());
+               for (Timer<K,W> timer : processingTimeTimers) {
+                       keySerializer.serialize(timer.key, out);
+                       windowSerializer.serialize(timer.window, out);
+                       out.writeLong(timer.timestamp);
+               }
+
+               out.writeInt(processingTimeTimerTimestamps.entrySet().size());
+               for (Multiset.Entry<Long> timerTimestampCounts: 
processingTimeTimerTimestamps.entrySet()) {
+                       out.writeLong(timerTimestampCounts.getElement());
+                       out.writeInt(timerTimestampCounts.getCount());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e7586c3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 7b154cd..51904b3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -17,16 +17,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -41,23 +31,32 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
 import org.apache.flink.runtime.state.AsynchronousStateHandle;
 import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.StateBackendFactory;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.runtime.state.StateBackendFactory;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Base class for all streaming tasks. A task is the unit of local processing 
that is deployed
  * and executed by the TaskManagers. Each task runs one or more {@link 
StreamOperator}s which form
@@ -129,7 +128,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
        private ClassLoader userClassLoader;
        
        /** The executor service that schedules and calls the triggers of this 
task*/
-       private ScheduledExecutorService timerService;
+       private ScheduledThreadPoolExecutor timerService;
        
        /** The map of user-defined accumulators of this task */
        private Map<String, Accumulator<?, ?>> accumulatorMap;
@@ -191,8 +190,9 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                headOperator.setup(this, configuration, 
operatorChain.getChainEntryPoint());
                        }
 
-                       timerService = 
Executors.newSingleThreadScheduledExecutor(
-                                       new 
DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
+                       timerService =new ScheduledThreadPoolExecutor(1, new 
DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
+                       // allow trigger tasks to be removed if all timers for 
that timestamp are removed by user
+                       timerService.setRemoveOnCancelPolicy(true);
 
                        // task specific initialization
                        init();
@@ -663,13 +663,13 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
        /**
         * Registers a timer.
         */
-       public void registerTimer(final long timestamp, final Triggerable 
target) {
+       public ScheduledFuture<?> registerTimer(final long timestamp, final 
Triggerable target) {
                long delay = Math.max(timestamp - System.currentTimeMillis(), 
0);
 
-               timerService.schedule(
-                               new TriggerTask(this, lock, target, timestamp),
-                               delay,
-                               TimeUnit.MILLISECONDS);
+               return timerService.schedule(
+                       new TriggerTask(this, lock, target, timestamp),
+                       delay,
+                       TimeUnit.MILLISECONDS);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e7586c3b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 233131d..94b0e3c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -58,6 +60,8 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -842,6 +846,77 @@ public class WindowOperatorTest {
                testHarness.close();
        }
 
+       @Test
+       public void testRestoreAndSnapshotAreInSync() throws Exception {
+
+               final int WINDOW_SIZE = 3;
+               final int WINDOW_SLIDE = 1;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
+                               new SumReducer(),
+                               inputType.createSerializer(new 
ExecutionConfig()));
+
+               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
+                               SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+                               new TimeWindow.Serializer(),
+                               new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                               stateDesc,
+                               new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+                               EventTimeTrigger.create());
+
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                               new 
OneInputStreamOperatorTestHarness<>(operator);
+
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               operator.setInputType(inputType, new ExecutionConfig());
+               testHarness.open();
+
+               WindowOperator.Timer<String, TimeWindow> timer1 = new 
WindowOperator.Timer<>(1L, "key1", new TimeWindow(1L, 2L));
+               WindowOperator.Timer<String, TimeWindow> timer2 = new 
WindowOperator.Timer<>(3L, "key1", new TimeWindow(1L, 2L));
+               WindowOperator.Timer<String, TimeWindow> timer3 = new 
WindowOperator.Timer<>(2L, "key1", new TimeWindow(1L, 2L));
+               operator.processingTimeTimers.add(timer1);
+               operator.processingTimeTimers.add(timer2);
+               operator.processingTimeTimers.add(timer3);
+               operator.processingTimeTimersQueue.add(timer1);
+               operator.processingTimeTimersQueue.add(timer2);
+               operator.processingTimeTimersQueue.add(timer3);
+
+               operator.processingTimeTimerTimestamps.add(1L, 10);
+               operator.processingTimeTimerTimestamps.add(2L, 5);
+               operator.processingTimeTimerTimestamps.add(3L, 1);
+
+
+               StreamTaskState snapshot = testHarness.snapshot(0, 0);
+
+               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> otherOperator = new 
WindowOperator<>(
+                               SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+                               new TimeWindow.Serializer(),
+                               new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                               stateDesc,
+                               new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+                               EventTimeTrigger.create());
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> otherTestHarness =
+                               new 
OneInputStreamOperatorTestHarness<>(otherOperator);
+
+               otherTestHarness.configureForKeyedStream(new 
TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+               otherOperator.setInputType(inputType, new ExecutionConfig());
+
+               otherTestHarness.setup();
+               otherTestHarness.restore(snapshot, 0);
+               otherTestHarness.open();
+
+               Assert.assertEquals(operator.processingTimeTimers, 
otherOperator.processingTimeTimers);
+               
Assert.assertArrayEquals(operator.processingTimeTimersQueue.toArray(), 
otherOperator.processingTimeTimersQueue.toArray());
+               Assert.assertEquals(operator.processingTimeTimerTimestamps, 
otherOperator.processingTimeTimerTimestamps);
+       }
+
        // 
------------------------------------------------------------------------
        //  UDFs
        // 
------------------------------------------------------------------------

Reply via email to