Repository: flink
Updated Branches:
  refs/heads/master 568845a3c -> 51a5048b2


[FLINK-4494] Expose the TimeServiceProvider from the Task to each Operator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ffff2997
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ffff2997
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ffff2997

Branch: refs/heads/master
Commit: ffff2997a5c82386a6428744f39b808a3fb49538
Parents: 4779c7e
Author: kl0u <kklou...@gmail.com>
Authored: Tue Sep 20 14:45:01 2016 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Sep 23 15:01:06 2016 +0200

----------------------------------------------------------------------
 .../api/operators/AbstractStreamOperator.java   |  19 +--
 .../streaming/api/operators/StreamSource.java   |  23 ++--
 .../api/operators/StreamingRuntimeContext.java  |  19 ---
 .../api/windowing/assigners/WindowAssigner.java |   4 +-
 .../api/windowing/triggers/Trigger.java         |   4 +-
 .../operators/ExtractTimestampsOperator.java    |   9 +-
 ...TimestampsAndPeriodicWatermarksOperator.java |   6 +-
 ...ractAlignedProcessingTimeWindowOperator.java |   8 +-
 .../operators/windowing/WindowOperator.java     |   7 +-
 .../streaming/runtime/tasks/StreamTask.java     |  19 +--
 .../operators/StreamSourceOperatorTest.java     |  28 +---
 .../runtime/operators/StreamTaskTimerTest.java  |  12 +-
 .../runtime/operators/TimeProviderTest.java     |  14 +-
 ...AlignedProcessingTimeWindowOperatorTest.java | 104 +++++++--------
 ...AlignedProcessingTimeWindowOperatorTest.java | 128 ++++++++-----------
 .../runtime/tasks/StreamTaskTestHarness.java    |   9 +-
 .../KeyedOneInputStreamOperatorTestHarness.java |   2 +-
 .../flink/streaming/util/MockContext.java       |  35 +----
 .../util/OneInputStreamOperatorTestHarness.java |  21 +--
 .../runtime/StreamTaskTimerITCase.java          |  10 +-
 20 files changed, 177 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 71296e3..a73f3b2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -35,14 +35,12 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ScheduledFuture;
-
 /**
  * Base class for all stream operators. Operators that contain a user function 
should extend the class 
  * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass 
of this class). 
@@ -230,18 +228,11 @@ public abstract class AbstractStreamOperator<OUT>
        }
 
        /**
-        * Register a timer callback. At the specified time the provided {@link 
Triggerable} will
-        * be invoked. This call is guaranteed to not happen concurrently with 
method calls on the operator.
-        *
-        * @param time The absolute time in milliseconds.
-        * @param target The target to be triggered.
+        * Returns the {@link TimeServiceProvider} responsible for getting  the 
current
+        * processing time and registering timers.
         */
-       protected ScheduledFuture<?> registerTimer(long time, Triggerable 
target) {
-               return container.registerTimer(time, target);
-       }
-
-       protected long getCurrentProcessingTime() {
-               return container.getCurrentProcessingTime();
+       protected TimeServiceProvider getTimerService() {
+               return container.getTimerService();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 38c948b..22987ab 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 
 import java.util.concurrent.ScheduledFuture;
 
@@ -189,6 +190,7 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
        public static class AutomaticWatermarkContext<T> implements 
SourceFunction.SourceContext<T> {
 
                private final StreamSource<?, ?> owner;
+               private final TimeServiceProvider timeService;
                private final Object lockingObject;
                private final Output<StreamRecord<T>> output;
                private final StreamRecord<T> reuse;
@@ -209,14 +211,15 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                        }
 
                        this.owner = owner;
+                       this.timeService = owner.getTimerService();
                        this.lockingObject = lockingObjectParam;
                        this.output = outputParam;
                        this.watermarkInterval = watermarkInterval;
                        this.reuse = new StreamRecord<T>(null);
 
-                       long now = owner.getCurrentProcessingTime();
-                       this.watermarkTimer = owner.registerTimer(now + 
watermarkInterval,
-                               new WatermarkEmittingTask(owner, 
lockingObjectParam, outputParam));
+                       long now = this.timeService.getCurrentProcessingTime();
+                       this.watermarkTimer = 
this.timeService.registerTimer(now + watermarkInterval,
+                               new WatermarkEmittingTask(this.timeService, 
lockingObjectParam, outputParam));
                }
 
                @Override
@@ -224,7 +227,7 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                        owner.checkAsyncException();
                        
                        synchronized (lockingObject) {
-                               final long currentTime = 
owner.getCurrentProcessingTime();
+                               final long currentTime = 
this.timeService.getCurrentProcessingTime();
                                output.collect(reuse.replace(element, 
currentTime));
 
                                // this is to avoid lock contention in the 
lockingObject by
@@ -276,19 +279,19 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
 
                private class WatermarkEmittingTask implements Triggerable {
 
-                       private final StreamSource<?, ?> owner;
+                       private final TimeServiceProvider timeService;
                        private final Object lockingObject;
                        private final Output<StreamRecord<T>> output;
 
-                       private WatermarkEmittingTask(StreamSource<?, ?> src, 
Object lock, Output<StreamRecord<T>> output) {
-                               this.owner = src;
+                       private WatermarkEmittingTask(TimeServiceProvider 
timeService, Object lock, Output<StreamRecord<T>> output) {
+                               this.timeService = timeService;
                                this.lockingObject = lock;
                                this.output = output;
                        }
 
                        @Override
                        public void trigger(long timestamp) {
-                               final long currentTime = 
owner.getCurrentProcessingTime();
+                               final long currentTime = 
this.timeService.getCurrentProcessingTime();
 
                                if (currentTime > nextWatermarkTime) {
                                        // align the watermarks across all 
machines. this will ensure that we
@@ -304,8 +307,8 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                                        }
                                }
 
-                               
owner.registerTimer(owner.getCurrentProcessingTime() + watermarkInterval,
-                                       new WatermarkEmittingTask(owner, 
lockingObject, output));
+                               
this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + 
watermarkInterval,
+                                       new 
WatermarkEmittingTask(this.timeService, lockingObject, output));
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 863cf17..961bd9d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -83,25 +83,6 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
                return taskEnvironment.getInputSplitProvider();
        }
 
-       /**
-        * Register a timer callback. At the specified time the {@link 
Triggerable } will be invoked.
-        * This call is guaranteed to not happen concurrently with method calls 
on the operator.
-        *
-        * @param time The absolute time in milliseconds.
-        * @param target The target to be triggered.
-        */
-       public ScheduledFuture<?> registerTimer(long time, Triggerable target) {
-               return operator.registerTimer(time, target);
-       }
-
-       /**
-        * Returns the current processing time as defined by the task's
-        * {@link org.apache.flink.streaming.runtime.tasks.TimeServiceProvider 
TimeServiceProvider}
-        */
-       public long getCurrentProcessingTime() {
-               return operator.getCurrentProcessingTime();
-       }
-
        // 
------------------------------------------------------------------------
        //  broadcast variables
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
index 9f487af..7a27cc8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import java.io.Serializable;
 
@@ -85,8 +84,7 @@ public abstract class WindowAssigner<T, W extends Window> 
implements Serializabl
        public abstract static class WindowAssignerContext {
 
                /**
-                * Returns the current processing time, as returned by
-                * the {@link StreamTask#getCurrentProcessingTime()}.
+                * Returns the current processing time.
                 */
                public abstract long getCurrentProcessingTime();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index 4d6c60f..ff80639 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import java.io.Serializable;
 
@@ -128,8 +127,7 @@ public abstract class Trigger<T, W extends Window> 
implements Serializable {
        public interface TriggerContext {
 
                /**
-                * Returns the current processing time, as returned by
-                * the {@link StreamTask#getCurrentProcessingTime()}.
+                * Returns the current processing time.
                 */
                long getCurrentProcessingTime();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index a4815dc..c92ff34 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -54,9 +54,9 @@ public class ExtractTimestampsOperator<T>
                super.open();
                watermarkInterval = 
getExecutionConfig().getAutoWatermarkInterval();
                if (watermarkInterval > 0) {
-                       registerTimer(System.currentTimeMillis() + 
watermarkInterval, this);
+                       long now = getTimerService().getCurrentProcessingTime();
+                       getTimerService().registerTimer(now + 
watermarkInterval, this);
                }
-
                currentWatermark = Long.MIN_VALUE;
        }
 
@@ -74,14 +74,15 @@ public class ExtractTimestampsOperator<T>
        @Override
        public void trigger(long timestamp) throws Exception {
                // register next timer
-               registerTimer(System.currentTimeMillis() + watermarkInterval, 
this);
                long newWatermark = userFunction.getCurrentWatermark();
-
                if (newWatermark > currentWatermark) {
                        currentWatermark = newWatermark;
                        // emit watermark
                        output.emitWatermark(new Watermark(currentWatermark));
                }
+
+               long now = getTimerService().getCurrentProcessingTime();
+               getTimerService().registerTimer(now + watermarkInterval, this);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
index 92faed2..f791723 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
@@ -54,7 +54,8 @@ public class TimestampsAndPeriodicWatermarksOperator<T>
                watermarkInterval = 
getExecutionConfig().getAutoWatermarkInterval();
                
                if (watermarkInterval > 0) {
-                       registerTimer(System.currentTimeMillis() + 
watermarkInterval, this);
+                       long now = getTimerService().getCurrentProcessingTime();
+                       getTimerService().registerTimer(now + 
watermarkInterval, this);
                }
        }
 
@@ -76,7 +77,8 @@ public class TimestampsAndPeriodicWatermarksOperator<T>
                        output.emitWatermark(newWatermark);
                }
 
-               registerTimer(System.currentTimeMillis() + watermarkInterval, 
this);
+               long now = getTimerService().getCurrentProcessingTime();
+               getTimerService().registerTimer(now + watermarkInterval, this);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index e74dd87..b39b760 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -125,7 +125,7 @@ public abstract class 
AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
                
                // decide when to first compute the window and when to slide it
                // the values should align with the start of time (that is, the 
UNIX epoch, not the big bang)
-               final long now = getRuntimeContext().getCurrentProcessingTime();
+               final long now = getTimerService().getCurrentProcessingTime();
                nextEvaluationTime = now + windowSlide - (now % windowSlide);
                nextSlideTime = now + paneSize - (now % paneSize);
 
@@ -164,9 +164,9 @@ public abstract class 
AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
                                nextPastTriggerTime = 
Math.min(nextPastEvaluationTime, nextPastSlideTime);
                        }
                }
-               
+
                // make sure the first window happens
-               registerTimer(firstTriggerTime, this);
+               getTimerService().registerTimer(firstTriggerTime, this);
        }
 
        @Override
@@ -230,7 +230,7 @@ public abstract class 
AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
                }
 
                long nextTriggerTime = Math.min(nextEvaluationTime, 
nextSlideTime);
-               registerTimer(nextTriggerTime, this);
+               getTimerService().registerTimer(nextTriggerTime, this);
        }
        
        private void computeWindow(long timestamp) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index dffa2a1..e4939db 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -255,7 +255,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                windowAssignerContext = new 
WindowAssigner.WindowAssignerContext() {
                        @Override
                        public long getCurrentProcessingTime() {
-                               return 
WindowOperator.this.getCurrentProcessingTime();
+                               return 
WindowOperator.this.getTimerService().getCurrentProcessingTime();
                        }
                };
 
@@ -721,7 +721,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
                @Override
                public long getCurrentProcessingTime() {
-                       return WindowOperator.this.getCurrentProcessingTime();
+                       return 
WindowOperator.this.getTimerService().getCurrentProcessingTime();
                }
 
                @Override
@@ -732,7 +732,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                processingTimeTimersQueue.add(timer);
                                //If this is the first timer added for this 
timestamp register a TriggerTask
                                if (processingTimeTimerTimestamps.add(time, 1) 
== 0) {
-                                       ScheduledFuture<?> scheduledFuture = 
WindowOperator.this.registerTimer(time, WindowOperator.this);
+                                       ScheduledFuture<?> scheduledFuture = 
WindowOperator.this.getTimerService()
+                                               .registerTimer(time, 
WindowOperator.this);
                                        processingTimeTimerFutures.put(time, 
scheduledFuture);
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 80d51a6..faa9672 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -47,7 +47,6 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import org.slf4j.Logger;
@@ -65,7 +64,6 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 /**
@@ -207,16 +205,6 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                timerService = timeProvider;
        }
 
-       /**
-        * Returns the current processing time.
-        */
-       public long getCurrentProcessingTime() {
-               if (timerService == null) {
-                       throw new IllegalStateException("The timer service has 
not been initialized.");
-               }
-               return timerService.getCurrentProcessingTime();
-       }
-
        @Override
        public final void invoke() throws Exception {
 
@@ -825,13 +813,14 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
        }
 
        /**
-        * Registers a timer.
+        * Returns the {@link TimeServiceProvider} responsible for telling the 
current
+        * processing time and registering timers.
         */
-       public ScheduledFuture<?> registerTimer(final long timestamp, final 
Triggerable target) {
+       public TimeServiceProvider getTimerService() {
                if (timerService == null) {
                        throw new IllegalStateException("The timer service has 
not been initialized.");
                }
-               return timerService.registerTimer(timestamp, target);
+               return timerService;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index d61fec9..e8663f5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -45,12 +45,9 @@ import org.mockito.stubbing.Answer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ScheduledFuture;
 
 import static org.junit.Assert.*;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -241,30 +238,15 @@ public class StreamSourceOperatorTest {
                when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
                
when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, 
Accumulator<?, ?>>emptyMap());
 
-               doAnswer(new Answer<ScheduledFuture>() {
+               doAnswer(new Answer<TimeServiceProvider>() {
                        @Override
-                       public ScheduledFuture answer(InvocationOnMock 
invocation) throws Throwable {
-                               final long execTime = (Long) 
invocation.getArguments()[0];
-                               final Triggerable target = (Triggerable) 
invocation.getArguments()[1];
-
-                               if (timeProvider == null) {
-                                       throw new RuntimeException("The time 
provider is null");
-                               }
-
-                               timeProvider.registerTimer(execTime, target);
-                               return null;
-                       }
-               }).when(mockTask).registerTimer(anyLong(), 
any(Triggerable.class));
-
-               doAnswer(new Answer<Long>() {
-                       @Override
-                       public Long answer(InvocationOnMock invocation) throws 
Throwable {
+                       public TimeServiceProvider answer(InvocationOnMock 
invocation) throws Throwable {
                                if (timeProvider == null) {
-                                       throw new RuntimeException("The time 
provider is null");
+                                       throw new RuntimeException("The time 
provider is null.");
                                }
-                               return timeProvider.getCurrentProcessingTime();
+                               return timeProvider;
                        }
-               }).when(mockTask).getCurrentProcessingTime();
+               }).when(mockTask).getTimerService();
 
                operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) 
mock(Output.class));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index b9435f5..98058e8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -63,7 +64,7 @@ public class StreamTaskTimerTest {
                testHarness.waitForTaskRunning();
 
                // first one spawns thread
-               mapTask.registerTimer(System.currentTimeMillis(), new 
Triggerable() {
+               
mapTask.getTimerService().registerTimer(System.currentTimeMillis(), new 
Triggerable() {
                        @Override
                        public void trigger(long timestamp) {
                        }
@@ -105,10 +106,11 @@ public class StreamTaskTimerTest {
                        final long t3 = System.currentTimeMillis() + 100;
                        final long t4 = System.currentTimeMillis() + 200;
 
-                       mapTask.registerTimer(t1, new 
ValidatingTriggerable(errorRef, t1, 0));
-                       mapTask.registerTimer(t2, new 
ValidatingTriggerable(errorRef, t2, 1));
-                       mapTask.registerTimer(t3, new 
ValidatingTriggerable(errorRef, t3, 2));
-                       mapTask.registerTimer(t4, new 
ValidatingTriggerable(errorRef, t4, 3));
+                       TimeServiceProvider timeService = 
mapTask.getTimerService();
+                       timeService.registerTimer(t1, new 
ValidatingTriggerable(errorRef, t1, 0));
+                       timeService.registerTimer(t2, new 
ValidatingTriggerable(errorRef, t2, 1));
+                       timeService.registerTimer(t3, new 
ValidatingTriggerable(errorRef, t3, 2));
+                       timeService.registerTimer(t4, new 
ValidatingTriggerable(errorRef, t4, 3));
 
                        long deadline = System.currentTimeMillis() + 20000;
                        while (errorRef.get() == null &&

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
index 4d4f07b..140e9e2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.core.testutils.OneShotLatch;
-import 
org.apache.flink.hadoop.shaded.org.apache.http.impl.conn.SystemDefaultDnsResolver;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamMap;
@@ -40,7 +39,6 @@ import java.util.List;
 import java.util.concurrent.Executors;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(ResultPartitionWriter.class)
@@ -150,7 +148,7 @@ public class TimeProviderTest {
                        }
                });
 
-               Assert.assertTrue(provider.getNoOfRegisteredTimers() == 4);
+               Assert.assertEquals(provider.getNoOfRegisteredTimers(), 4);
 
                provider.setCurrentTime(100);
                long seen = 0;
@@ -177,24 +175,24 @@ public class TimeProviderTest {
 
                testHarness.invoke();
 
-               assertTrue(testHarness.getCurrentProcessingTime() == 0);
+               
assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 0);
 
                tp.setCurrentTime(11);
-               assertTrue(testHarness.getCurrentProcessingTime() == 11);
+               
assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 11);
 
                tp.setCurrentTime(15);
                tp.setCurrentTime(16);
-               assertTrue(testHarness.getCurrentProcessingTime() == 16);
+               
assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 16);
 
                // register 2 tasks
-               mapTask.registerTimer(30, new Triggerable() {
+               mapTask.getTimerService().registerTimer(30, new Triggerable() {
                        @Override
                        public void trigger(long timestamp) {
 
                        }
                });
 
-               mapTask.registerTimer(40, new Triggerable() {
+               mapTask.getTimerService().registerTimer(40, new Triggerable() {
                        @Override
                        public void trigger(long timestamp) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 9849bd7..f33da89 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -39,10 +39,11 @@ import 
org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
@@ -50,28 +51,19 @@ import org.apache.flink.util.Collector;
 import org.junit.After;
 import org.junit.Test;
 
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -189,11 +181,15 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
        }
 
        @Test
-       public void testWindowTriggerTimeAlignment() {
+       public void testWindowTriggerTimeAlignment() throws Exception {
+               final Object lock = new Object();
+               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
+                       Executors.newSingleThreadScheduledExecutor(), lock);
+
                try {
                        @SuppressWarnings("unchecked")
                        final Output<StreamRecord<String>> mockOut = 
mock(Output.class);
-                       final StreamTask<?, ?> mockTask = createMockTask();
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        AccumulatingProcessingTimeWindowOperator<String, 
String, String> op;
 
@@ -233,16 +229,21 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
+               finally {
+                       timerService.shutdownService();
+               }
        }
 
        @Test
-       public void testTumblingWindow() {
-               final ScheduledExecutorService timerService = 
Executors.newSingleThreadScheduledExecutor();
+       public void testTumblingWindow() throws Exception {
+               final Object lock = new Object();
+               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
+                       Executors.newSingleThreadScheduledExecutor(), lock);
+
                try {
                        final int windowSize = 50;
                        final CollectingOutput<Integer> out = new 
CollectingOutput<>(windowSize);
-                       final Object lock = new Object();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        // tumbling window that triggers every 20 milliseconds
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
@@ -284,17 +285,19 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        fail(e.getMessage());
                }
                finally {
-                       timerService.shutdown();
+                       timerService.shutdownService();
                }
        }
 
        @Test
        public void testSlidingWindow() throws Exception {
-               final ScheduledExecutorService timerService = 
Executors.newSingleThreadScheduledExecutor();
+               final Object lock = new Object();
+               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
+                       Executors.newSingleThreadScheduledExecutor(), lock);
+
                try {
                        final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
-                       final Object lock = new Object();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
                        
                        // tumbling window that triggers every 20 milliseconds
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
@@ -344,18 +347,19 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                }
                        }
                } finally {
-                       timerService.shutdown();
+                       timerService.shutdownService();
                }
        }
 
        @Test
-       public void testTumblingWindowSingleElements() {
-               final ScheduledExecutorService timerService = 
Executors.newSingleThreadScheduledExecutor();
+       public void testTumblingWindowSingleElements() throws Exception {
+               final Object lock = new Object();
+               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
+                       Executors.newSingleThreadScheduledExecutor(), lock);
 
                try {
                        final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
-                       final Object lock = new Object();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        // tumbling window that triggers every 20 milliseconds
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
@@ -400,18 +404,19 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        fail(e.getMessage());
                }
                finally {
-                       timerService.shutdown();
+                       timerService.shutdownService();
                }
        }
        
        @Test
-       public void testSlidingWindowSingleElements() {
-               final ScheduledExecutorService timerService = 
Executors.newSingleThreadScheduledExecutor();
+       public void testSlidingWindowSingleElements() throws Exception {
+               final Object lock = new Object();
+               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
+                       Executors.newSingleThreadScheduledExecutor(), lock);
 
                try {
                        final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
-                       final Object lock = new Object();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        // tumbling window that triggers every 20 milliseconds
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
@@ -447,7 +452,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        fail(e.getMessage());
                }
                finally {
-                       timerService.shutdown();
+                       timerService.shutdownService();
                }
        }
 
@@ -494,8 +499,9 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                testHarness.processElement(new StreamRecord<>(i 
+ numElementsFirst));
                        }
 
+                       testHarness.close();
                        op.dispose();
-                       
+
                        // re-create the operator and restore the state
                        op = new AccumulatingProcessingTimeWindowOperator<>(
                                                        
validatingIdentityFunction, identitySelector,
@@ -527,6 +533,8 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        for (int i = 0; i < numElements; i++) {
                                assertEquals(i, finalResult.get(i).intValue());
                        }
+                       testHarness.close();
+                       op.dispose();
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -580,6 +588,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                testHarness.processElement(new 
StreamRecord<>(i));
                        }
 
+                       testHarness.close();
                        op.dispose();
 
                        // re-create the operator and restore the state
@@ -609,9 +618,6 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        timerService.setCurrentTime(300);
                        timerService.setCurrentTime(350);
 
-                       testHarness.close();
-                       op.dispose();
-
                        // get and verify the result
                        List<Integer> finalResult = new 
ArrayList<>(resultAtSnapshot);
                        List<Integer> finalPartialResult = 
extractFromStreamRecords(testHarness.getOutput());
@@ -622,6 +628,9 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        for (int i = 0; i < factor * numElements; i++) {
                                assertEquals(i / factor, 
finalResult.get(i).intValue());
                        }
+
+                       testHarness.close();
+                       op.dispose();
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -756,31 +765,10 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
        }
 
        private static StreamTask<?, ?> createMockTaskWithTimer(
-                       final ScheduledExecutorService timerService, final 
Object lock)
+               final TimeServiceProvider timerService)
        {
                StreamTask<?, ?> mockTask = createMockTask();
-
-               doAnswer(new Answer<Void>() {
-                       @Override
-                       public Void answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               final Long timestamp = (Long) 
invocationOnMock.getArguments()[0];
-                               final Triggerable target = (Triggerable) 
invocationOnMock.getArguments()[1];
-                               timerService.schedule(
-                                               new Callable<Object>() {
-                                                       @Override
-                                                       public Object call() 
throws Exception {
-                                                               synchronized 
(lock) {
-                                                                       
target.trigger(timestamp);
-                                                               }
-                                                               return null;
-                                                       }
-                                               },
-                                               timestamp - 
System.currentTimeMillis(),
-                                               TimeUnit.MILLISECONDS);
-                               return null;
-                       }
-               }).when(mockTask).registerTimer(anyLong(), 
any(Triggerable.class));
-
+               when(mockTask.getTimerService()).thenReturn(timerService);
                return mockTask;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 3dfa395..826b230 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -40,19 +40,17 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.junit.After;
 import org.junit.Test;
 
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -61,19 +59,13 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -197,11 +189,15 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
        }
 
        @Test
-       public void testWindowTriggerTimeAlignment() {
+       public void testWindowTriggerTimeAlignment() throws Exception {
+               final Object lock = new Object();
+               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
+                       Executors.newSingleThreadScheduledExecutor(), lock);
+
                try {
                        @SuppressWarnings("unchecked")
                        final Output<StreamRecord<String>> mockOut = 
mock(Output.class);
-                       final StreamTask<?, ?> mockTask = createMockTask();
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
                        
                        AggregatingProcessingTimeWindowOperator<String, String> 
op;
 
@@ -240,12 +236,17 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
+               } finally {
+                       timerService.shutdownService();
                }
        }
 
        @Test
-       public void testTumblingWindowUniqueElements() {
-               final ScheduledExecutorService timerService = 
Executors.newSingleThreadScheduledExecutor();
+       public void testTumblingWindowUniqueElements() throws Exception {
+               final Object lock = new Object();
+               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
+                       Executors.newSingleThreadScheduledExecutor(), lock);
+
                try {
                        final int windowSize = 50;
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(windowSize);
@@ -255,9 +256,8 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                                        sumFunction, 
fieldOneSelector,
                                                        IntSerializer.INSTANCE, 
tupleSerializer,
                                                        windowSize, windowSize);
-                       
-                       final Object lock = new Object();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        op.setup(mockTask, createTaskConfig(fieldOneSelector, 
IntSerializer.INSTANCE, 10), out);
                        op.open();
@@ -296,20 +296,20 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        fail(e.getMessage());
                }
                finally {
-                       timerService.shutdownNow();
+                       timerService.shutdownService();
                }
        }
 
        @Test
-       public void  testTumblingWindowDuplicateElements() {
-               final ScheduledExecutorService timerService = 
Executors.newSingleThreadScheduledExecutor();
-
+       public void testTumblingWindowDuplicateElements() throws Exception {
+               final Object lock = new Object();
+               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
+                       Executors.newSingleThreadScheduledExecutor(), lock);
                try {
                        final int windowSize = 50;
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(windowSize);
 
-                       final Object lock = new Object();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
                                        new 
AggregatingProcessingTimeWindowOperator<>(
@@ -364,18 +364,20 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        fail(e.getMessage());
                }
                finally {
-                       timerService.shutdown();
+                       timerService.shutdownService();
                }
        }
 
        @Test
-       public void testSlidingWindow() {
-               final ScheduledExecutorService timerService = 
Executors.newSingleThreadScheduledExecutor();
+       public void testSlidingWindow() throws Exception {
+               final Object lock = new Object();
+               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
+                       Executors.newSingleThreadScheduledExecutor(), lock);
+
                try {
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(50);
 
-                       final Object lock = new Object();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        // tumbling window that triggers every 20 milliseconds
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
@@ -434,18 +436,19 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        fail(e.getMessage());
                }
                finally {
-                       timerService.shutdownNow();
+                       timerService.shutdownService();
                }
        }
 
        @Test
-       public void testSlidingWindowSingleElements() {
-               final ScheduledExecutorService timerService = 
Executors.newSingleThreadScheduledExecutor();
+       public void testSlidingWindowSingleElements() throws Exception {
+               final Object lock = new Object();
+               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
+                       Executors.newSingleThreadScheduledExecutor(), lock);
 
                try {
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(50);
-                       final Object lock = new Object();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        // tumbling window that triggers every 20 milliseconds
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
@@ -493,17 +496,19 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        fail(e.getMessage());
                }
                finally {
-                       timerService.shutdown();
+                       timerService.shutdownService();
                }
        }
 
        @Test
-       public void testPropagateExceptionsFromProcessElement() {
-               final ScheduledExecutorService timerService = 
Executors.newSingleThreadScheduledExecutor();
+       public void testPropagateExceptionsFromProcessElement() throws 
Exception {
+               final Object lock = new Object();
+               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
+                       Executors.newSingleThreadScheduledExecutor(), lock);
+
                try {
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>();
-                       final Object lock = new Object();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        ReduceFunction<Tuple2<Integer, Integer>> 
failingFunction = new FailingFunction(100);
 
@@ -543,7 +548,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        fail(e.getMessage());
                }
                finally {
-                       timerService.shutdown();
+                       timerService.shutdownService();
                }
        }
 
@@ -593,6 +598,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                testHarness.processElement(next);
                        }
 
+                       testHarness.close();
                        op.dispose();
 
                        // re-create the operator and restore the state
@@ -622,14 +628,14 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        finalResult.addAll(partialFinalResult);
                        assertEquals(numElements, finalResult.size());
 
-                       testHarness.close();
-                       op.dispose();
-
                        Collections.sort(finalResult, tupleComparator);
                        for (int i = 0; i < numElements; i++) {
                                assertEquals(i, 
finalResult.get(i).f0.intValue());
                                assertEquals(i, 
finalResult.get(i).f1.intValue());
                        }
+
+                       testHarness.close();
+                       op.dispose();
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -685,6 +691,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                testHarness.processElement(next);
                        }
 
+                       testHarness.close();
                        op.dispose();
 
                        // re-create the operator and restore the state
@@ -715,9 +722,6 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        timerService.setCurrentTime(350);
                        timerService.setCurrentTime(400);
 
-                       testHarness.close();
-                       op.dispose();
-
                        // get and verify the result
                        List<Tuple2<Integer, Integer>> finalResult = new 
ArrayList<>(resultAtSnapshot);
                        List<Tuple2<Integer, Integer>> partialFinalResult = 
extractFromStreamRecords(testHarness.getOutput());
@@ -729,6 +733,9 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                assertEquals(i / factor, 
finalResult.get(i).f0.intValue());
                                assertEquals(i / factor, 
finalResult.get(i).f1.intValue());
                        }
+
+                       testHarness.close();
+                       op.dispose();
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -837,14 +844,13 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        timerService.setCurrentTime(150);
                        timerService.setCurrentTime(200);
 
-                       testHarness.close();
-
                        int count1 = StatefulFunction.globalCounts.get(1);
                        int count2 = StatefulFunction.globalCounts.get(2);
                        
                        assertTrue(count1 >= 2 && count1 <= 2 * numElements);
                        assertEquals(count1, count2);
-                       
+
+                       testHarness.close();
                        op.dispose();
                }
                catch (Exception e) {
@@ -941,32 +947,10 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                return task;
        }
 
-       private static StreamTask<?, ?> createMockTaskWithTimer(
-                       final ScheduledExecutorService timerService, final 
Object lock)
+       private static StreamTask<?, ?> createMockTaskWithTimer(final 
TimeServiceProvider timerService)
        {
                StreamTask<?, ?> mockTask = createMockTask();
-
-               doAnswer(new Answer<Void>() {
-                       @Override
-                       public Void answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               final Long timestamp = (Long) 
invocationOnMock.getArguments()[0];
-                               final Triggerable target = (Triggerable) 
invocationOnMock.getArguments()[1];
-                               timerService.schedule(
-                                               new Callable<Object>() {
-                                                       @Override
-                                                       public Object call() 
throws Exception {
-                                                               synchronized 
(lock) {
-                                                                       
target.trigger(timestamp);
-                                                               }
-                                                               return null;
-                                                       }
-                                               },
-                                               timestamp - 
System.currentTimeMillis(),
-                                               TimeUnit.MILLISECONDS);
-                               return null;
-                       }
-               }).when(mockTask).registerTimer(anyLong(), 
any(Triggerable.class));
-               
+               when(mockTask.getTimerService()).thenReturn(timerService);
                return mockTask;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index ce634f0..cbb5a9d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -113,11 +113,11 @@ public class StreamTaskTestHarness<OUT> {
                outputStreamRecordSerializer = new 
MultiplexingStreamRecordSerializer<OUT>(outputSerializer);
        }
 
-       public long getCurrentProcessingTime() {
+       public TimeServiceProvider getTimerService() {
                if (!(task instanceof StreamTask)) {
-                       throw new 
UnsupportedOperationException("getCurrentProcessingTime() only supported on 
StreamTasks.");
+                       throw new 
UnsupportedOperationException("getTimerService() only supported on 
StreamTasks.");
                }
-               return ((StreamTask) task).getCurrentProcessingTime();
+               return ((StreamTask) task).getTimerService();
        }
 
        /**
@@ -235,9 +235,6 @@ public class StreamTaskTestHarness<OUT> {
                }
                else {
                        if (taskThread.task instanceof StreamTask) {
-                               long base = System.currentTimeMillis();
-                               long now = 0;
-
                                StreamTask<?, ?> streamTask = (StreamTask<?, 
?>) taskThread.task;
                                while (!streamTask.isRunning()) {
                                        Thread.sleep(100);

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 7e86da0..430c6de 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 65ed43d..2dd2163 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -21,10 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -34,15 +31,10 @@ import 
org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -105,7 +97,7 @@ public class MockContext<IN, OUT> {
        }
 
        private static StreamTask<?, ?> createMockTaskWithTimer(
-                       final ScheduledExecutorService timerService, final 
Object lock)
+               final TimeServiceProvider timerService, final Object lock)
        {
                StreamTask<?, ?> task = mock(StreamTask.class);
                when(task.getAccumulatorMap()).thenReturn(new HashMap<String, 
Accumulator<?, ?>>());
@@ -113,28 +105,7 @@ public class MockContext<IN, OUT> {
                when(task.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
                when(task.getEnvironment()).thenReturn(new 
MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 
1024));
                when(task.getCheckpointLock()).thenReturn(lock);
-
-               doAnswer(new Answer<Void>() {
-                       @Override
-                       public Void answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               final Long timestamp = (Long) 
invocationOnMock.getArguments()[0];
-                               final Triggerable target = (Triggerable) 
invocationOnMock.getArguments()[1];
-                               timerService.schedule(
-                                               new Callable<Object>() {
-                                                       @Override
-                                                       public Object call() 
throws Exception {
-                                                               synchronized 
(lock) {
-                                                                       
target.trigger(timestamp);
-                                                               }
-                                                               return null;
-                                                       }
-                                               },
-                                               timestamp - 
System.currentTimeMillis(),
-                                               TimeUnit.MILLISECONDS);
-                               return null;
-                       }
-               }).when(task).registerTimer(anyLong(), any(Triggerable.class));
-
+               when(task.getTimerService()).thenReturn(timerService);
                return task;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 6c637bf..9cdc783 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -35,7 +35,6 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
@@ -50,7 +49,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -155,23 +153,12 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                timeServiceProvider = testTimeProvider != null ? 
testTimeProvider :
                        DefaultTimeServiceProvider.create(mockTask, 
Executors.newSingleThreadScheduledExecutor(), this.checkpointLock);
 
-               doAnswer(new Answer<Void>() {
-                       @Override
-                       public Void answer(InvocationOnMock invocation) throws 
Throwable {
-                               final long execTime = (Long) 
invocation.getArguments()[0];
-                               final Triggerable target = (Triggerable) 
invocation.getArguments()[1];
-
-                               timeServiceProvider.registerTimer(execTime, 
target);
-                               return null;
-                       }
-               }).when(mockTask).registerTimer(anyLong(), 
any(Triggerable.class));
-
-               doAnswer(new Answer<Long>() {
+               doAnswer(new Answer<TimeServiceProvider>() {
                        @Override
-                       public Long answer(InvocationOnMock invocation) throws 
Throwable {
-                               return 
timeServiceProvider.getCurrentProcessingTime();
+                       public TimeServiceProvider answer(InvocationOnMock 
invocation) throws Throwable {
+                               return timeServiceProvider;
                        }
-               }).when(mockTask).getCurrentProcessingTime();
+               }).when(mockTask).getTimerService();
        }
 
        public void setStateBackend(AbstractStateBackend stateBackend) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
index 33c8024..707ce0f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
@@ -192,7 +192,7 @@ public class StreamTaskTimerITCase extends 
StreamingMultipleProgramsTestBase {
                        }
 
                        if (first) {
-                               registerTimer(System.currentTimeMillis() + 100, 
this);
+                               
getTimerService().registerTimer(System.currentTimeMillis() + 100, this);
                                first = false;
                        }
                        numElements++;
@@ -209,7 +209,7 @@ public class StreamTaskTimerITCase extends 
StreamingMultipleProgramsTestBase {
                        try {
                                numTimers++;
                                throwIfDone();
-                               registerTimer(System.currentTimeMillis() + 1, 
this);
+                               
getTimerService().registerTimer(System.currentTimeMillis() + 1, this);
                        } finally {
                                semaphore.release();
                        }
@@ -251,7 +251,7 @@ public class StreamTaskTimerITCase extends 
StreamingMultipleProgramsTestBase {
                        }
 
                        if (first) {
-                               registerTimer(System.currentTimeMillis() + 100, 
this);
+                               
getTimerService().registerTimer(System.currentTimeMillis() + 100, this);
                                first = false;
                        }
                        numElements++;
@@ -266,7 +266,7 @@ public class StreamTaskTimerITCase extends 
StreamingMultipleProgramsTestBase {
                        }
 
                        if (first) {
-                               registerTimer(System.currentTimeMillis() + 100, 
this);
+                               
getTimerService().registerTimer(System.currentTimeMillis() + 100, this);
                                first = false;
                        }
                        numElements++;
@@ -284,7 +284,7 @@ public class StreamTaskTimerITCase extends 
StreamingMultipleProgramsTestBase {
                        try {
                                numTimers++;
                                throwIfDone();
-                               registerTimer(System.currentTimeMillis() + 1, 
this);
+                               
getTimerService().registerTimer(System.currentTimeMillis() + 1, this);
                        } finally {
                                semaphore.release();
                        }

Reply via email to