Repository: flink Updated Branches: refs/heads/master 568845a3c -> 51a5048b2
[FLINK-4494] Expose the TimeServiceProvider from the Task to each Operator. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ffff2997 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ffff2997 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ffff2997 Branch: refs/heads/master Commit: ffff2997a5c82386a6428744f39b808a3fb49538 Parents: 4779c7e Author: kl0u <kklou...@gmail.com> Authored: Tue Sep 20 14:45:01 2016 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Sep 23 15:01:06 2016 +0200 ---------------------------------------------------------------------- .../api/operators/AbstractStreamOperator.java | 19 +-- .../streaming/api/operators/StreamSource.java | 23 ++-- .../api/operators/StreamingRuntimeContext.java | 19 --- .../api/windowing/assigners/WindowAssigner.java | 4 +- .../api/windowing/triggers/Trigger.java | 4 +- .../operators/ExtractTimestampsOperator.java | 9 +- ...TimestampsAndPeriodicWatermarksOperator.java | 6 +- ...ractAlignedProcessingTimeWindowOperator.java | 8 +- .../operators/windowing/WindowOperator.java | 7 +- .../streaming/runtime/tasks/StreamTask.java | 19 +-- .../operators/StreamSourceOperatorTest.java | 28 +--- .../runtime/operators/StreamTaskTimerTest.java | 12 +- .../runtime/operators/TimeProviderTest.java | 14 +- ...AlignedProcessingTimeWindowOperatorTest.java | 104 +++++++-------- ...AlignedProcessingTimeWindowOperatorTest.java | 128 ++++++++----------- .../runtime/tasks/StreamTaskTestHarness.java | 9 +- .../KeyedOneInputStreamOperatorTestHarness.java | 2 +- .../flink/streaming/util/MockContext.java | 35 +---- .../util/OneInputStreamOperatorTestHarness.java | 21 +-- .../runtime/StreamTaskTimerITCase.java | 10 +- 20 files changed, 177 insertions(+), 304 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 71296e3..a73f3b2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -35,14 +35,12 @@ import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ScheduledFuture; - /** * Base class for all stream operators. Operators that contain a user function should extend the class * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class). @@ -230,18 +228,11 @@ public abstract class AbstractStreamOperator<OUT> } /** - * Register a timer callback. At the specified time the provided {@link Triggerable} will - * be invoked. This call is guaranteed to not happen concurrently with method calls on the operator. - * - * @param time The absolute time in milliseconds. - * @param target The target to be triggered. + * Returns the {@link TimeServiceProvider} responsible for getting the current + * processing time and registering timers. */ - protected ScheduledFuture<?> registerTimer(long time, Triggerable target) { - return container.registerTimer(time, target); - } - - protected long getCurrentProcessingTime() { - return container.getCurrentProcessingTime(); + protected TimeServiceProvider getTimerService() { + return container.getTimerService(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 38c948b..22987ab 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import java.util.concurrent.ScheduledFuture; @@ -189,6 +190,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> { private final StreamSource<?, ?> owner; + private final TimeServiceProvider timeService; private final Object lockingObject; private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; @@ -209,14 +211,15 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> } this.owner = owner; + this.timeService = owner.getTimerService(); this.lockingObject = lockingObjectParam; this.output = outputParam; this.watermarkInterval = watermarkInterval; this.reuse = new StreamRecord<T>(null); - long now = owner.getCurrentProcessingTime(); - this.watermarkTimer = owner.registerTimer(now + watermarkInterval, - new WatermarkEmittingTask(owner, lockingObjectParam, outputParam)); + long now = this.timeService.getCurrentProcessingTime(); + this.watermarkTimer = this.timeService.registerTimer(now + watermarkInterval, + new WatermarkEmittingTask(this.timeService, lockingObjectParam, outputParam)); } @Override @@ -224,7 +227,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> owner.checkAsyncException(); synchronized (lockingObject) { - final long currentTime = owner.getCurrentProcessingTime(); + final long currentTime = this.timeService.getCurrentProcessingTime(); output.collect(reuse.replace(element, currentTime)); // this is to avoid lock contention in the lockingObject by @@ -276,19 +279,19 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> private class WatermarkEmittingTask implements Triggerable { - private final StreamSource<?, ?> owner; + private final TimeServiceProvider timeService; private final Object lockingObject; private final Output<StreamRecord<T>> output; - private WatermarkEmittingTask(StreamSource<?, ?> src, Object lock, Output<StreamRecord<T>> output) { - this.owner = src; + private WatermarkEmittingTask(TimeServiceProvider timeService, Object lock, Output<StreamRecord<T>> output) { + this.timeService = timeService; this.lockingObject = lock; this.output = output; } @Override public void trigger(long timestamp) { - final long currentTime = owner.getCurrentProcessingTime(); + final long currentTime = this.timeService.getCurrentProcessingTime(); if (currentTime > nextWatermarkTime) { // align the watermarks across all machines. this will ensure that we @@ -304,8 +307,8 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> } } - owner.registerTimer(owner.getCurrentProcessingTime() + watermarkInterval, - new WatermarkEmittingTask(owner, lockingObject, output)); + this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + watermarkInterval, + new WatermarkEmittingTask(this.timeService, lockingObject, output)); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index 863cf17..961bd9d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -83,25 +83,6 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { return taskEnvironment.getInputSplitProvider(); } - /** - * Register a timer callback. At the specified time the {@link Triggerable } will be invoked. - * This call is guaranteed to not happen concurrently with method calls on the operator. - * - * @param time The absolute time in milliseconds. - * @param target The target to be triggered. - */ - public ScheduledFuture<?> registerTimer(long time, Triggerable target) { - return operator.registerTimer(time, target); - } - - /** - * Returns the current processing time as defined by the task's - * {@link org.apache.flink.streaming.runtime.tasks.TimeServiceProvider TimeServiceProvider} - */ - public long getCurrentProcessingTime() { - return operator.getCurrentProcessingTime(); - } - // ------------------------------------------------------------------------ // broadcast variables // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java index 9f487af..7a27cc8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java @@ -24,7 +24,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.streaming.runtime.tasks.StreamTask; import java.io.Serializable; @@ -85,8 +84,7 @@ public abstract class WindowAssigner<T, W extends Window> implements Serializabl public abstract static class WindowAssignerContext { /** - * Returns the current processing time, as returned by - * the {@link StreamTask#getCurrentProcessingTime()}. + * Returns the current processing time. */ public abstract long getCurrentProcessingTime(); http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java index 4d6c60f..ff80639 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java @@ -27,7 +27,6 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.streaming.runtime.tasks.StreamTask; import java.io.Serializable; @@ -128,8 +127,7 @@ public abstract class Trigger<T, W extends Window> implements Serializable { public interface TriggerContext { /** - * Returns the current processing time, as returned by - * the {@link StreamTask#getCurrentProcessingTime()}. + * Returns the current processing time. */ long getCurrentProcessingTime(); http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java index a4815dc..c92ff34 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java @@ -54,9 +54,9 @@ public class ExtractTimestampsOperator<T> super.open(); watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); if (watermarkInterval > 0) { - registerTimer(System.currentTimeMillis() + watermarkInterval, this); + long now = getTimerService().getCurrentProcessingTime(); + getTimerService().registerTimer(now + watermarkInterval, this); } - currentWatermark = Long.MIN_VALUE; } @@ -74,14 +74,15 @@ public class ExtractTimestampsOperator<T> @Override public void trigger(long timestamp) throws Exception { // register next timer - registerTimer(System.currentTimeMillis() + watermarkInterval, this); long newWatermark = userFunction.getCurrentWatermark(); - if (newWatermark > currentWatermark) { currentWatermark = newWatermark; // emit watermark output.emitWatermark(new Watermark(currentWatermark)); } + + long now = getTimerService().getCurrentProcessingTime(); + getTimerService().registerTimer(now + watermarkInterval, this); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java index 92faed2..f791723 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java @@ -54,7 +54,8 @@ public class TimestampsAndPeriodicWatermarksOperator<T> watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); if (watermarkInterval > 0) { - registerTimer(System.currentTimeMillis() + watermarkInterval, this); + long now = getTimerService().getCurrentProcessingTime(); + getTimerService().registerTimer(now + watermarkInterval, this); } } @@ -76,7 +77,8 @@ public class TimestampsAndPeriodicWatermarksOperator<T> output.emitWatermark(newWatermark); } - registerTimer(System.currentTimeMillis() + watermarkInterval, this); + long now = getTimerService().getCurrentProcessingTime(); + getTimerService().registerTimer(now + watermarkInterval, this); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java index e74dd87..b39b760 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java @@ -125,7 +125,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, // decide when to first compute the window and when to slide it // the values should align with the start of time (that is, the UNIX epoch, not the big bang) - final long now = getRuntimeContext().getCurrentProcessingTime(); + final long now = getTimerService().getCurrentProcessingTime(); nextEvaluationTime = now + windowSlide - (now % windowSlide); nextSlideTime = now + paneSize - (now % paneSize); @@ -164,9 +164,9 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime); } } - + // make sure the first window happens - registerTimer(firstTriggerTime, this); + getTimerService().registerTimer(firstTriggerTime, this); } @Override @@ -230,7 +230,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, } long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime); - registerTimer(nextTriggerTime, this); + getTimerService().registerTimer(nextTriggerTime, this); } private void computeWindow(long timestamp) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index dffa2a1..e4939db 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -255,7 +255,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> windowAssignerContext = new WindowAssigner.WindowAssignerContext() { @Override public long getCurrentProcessingTime() { - return WindowOperator.this.getCurrentProcessingTime(); + return WindowOperator.this.getTimerService().getCurrentProcessingTime(); } }; @@ -721,7 +721,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> @Override public long getCurrentProcessingTime() { - return WindowOperator.this.getCurrentProcessingTime(); + return WindowOperator.this.getTimerService().getCurrentProcessingTime(); } @Override @@ -732,7 +732,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> processingTimeTimersQueue.add(timer); //If this is the first timer added for this timestamp register a TriggerTask if (processingTimeTimerTimestamps.add(time, 1) == 0) { - ScheduledFuture<?> scheduledFuture = WindowOperator.this.registerTimer(time, WindowOperator.this); + ScheduledFuture<?> scheduledFuture = WindowOperator.this.getTimerService() + .registerTimer(time, WindowOperator.this); processingTimeTimerFutures.put(time, scheduledFuture); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 80d51a6..faa9672 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -47,7 +47,6 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; -import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.slf4j.Logger; @@ -65,7 +64,6 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RunnableFuture; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; /** @@ -207,16 +205,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> timerService = timeProvider; } - /** - * Returns the current processing time. - */ - public long getCurrentProcessingTime() { - if (timerService == null) { - throw new IllegalStateException("The timer service has not been initialized."); - } - return timerService.getCurrentProcessingTime(); - } - @Override public final void invoke() throws Exception { @@ -825,13 +813,14 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> } /** - * Registers a timer. + * Returns the {@link TimeServiceProvider} responsible for telling the current + * processing time and registering timers. */ - public ScheduledFuture<?> registerTimer(final long timestamp, final Triggerable target) { + public TimeServiceProvider getTimerService() { if (timerService == null) { throw new IllegalStateException("The timer service has not been initialized."); } - return timerService.registerTimer(timestamp, target); + return timerService; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java index d61fec9..e8663f5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java @@ -45,12 +45,9 @@ import org.mockito.stubbing.Answer; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.ScheduledFuture; import static org.junit.Assert.*; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -241,30 +238,15 @@ public class StreamSourceOperatorTest { when(mockTask.getExecutionConfig()).thenReturn(executionConfig); when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap()); - doAnswer(new Answer<ScheduledFuture>() { + doAnswer(new Answer<TimeServiceProvider>() { @Override - public ScheduledFuture answer(InvocationOnMock invocation) throws Throwable { - final long execTime = (Long) invocation.getArguments()[0]; - final Triggerable target = (Triggerable) invocation.getArguments()[1]; - - if (timeProvider == null) { - throw new RuntimeException("The time provider is null"); - } - - timeProvider.registerTimer(execTime, target); - return null; - } - }).when(mockTask).registerTimer(anyLong(), any(Triggerable.class)); - - doAnswer(new Answer<Long>() { - @Override - public Long answer(InvocationOnMock invocation) throws Throwable { + public TimeServiceProvider answer(InvocationOnMock invocation) throws Throwable { if (timeProvider == null) { - throw new RuntimeException("The time provider is null"); + throw new RuntimeException("The time provider is null."); } - return timeProvider.getCurrentProcessingTime(); + return timeProvider; } - }).when(mockTask).getCurrentProcessingTime(); + }).when(mockTask).getTimerService(); operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class)); } http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java index b9435f5..98058e8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import org.junit.Test; import org.junit.runner.RunWith; @@ -63,7 +64,7 @@ public class StreamTaskTimerTest { testHarness.waitForTaskRunning(); // first one spawns thread - mapTask.registerTimer(System.currentTimeMillis(), new Triggerable() { + mapTask.getTimerService().registerTimer(System.currentTimeMillis(), new Triggerable() { @Override public void trigger(long timestamp) { } @@ -105,10 +106,11 @@ public class StreamTaskTimerTest { final long t3 = System.currentTimeMillis() + 100; final long t4 = System.currentTimeMillis() + 200; - mapTask.registerTimer(t1, new ValidatingTriggerable(errorRef, t1, 0)); - mapTask.registerTimer(t2, new ValidatingTriggerable(errorRef, t2, 1)); - mapTask.registerTimer(t3, new ValidatingTriggerable(errorRef, t3, 2)); - mapTask.registerTimer(t4, new ValidatingTriggerable(errorRef, t4, 3)); + TimeServiceProvider timeService = mapTask.getTimerService(); + timeService.registerTimer(t1, new ValidatingTriggerable(errorRef, t1, 0)); + timeService.registerTimer(t2, new ValidatingTriggerable(errorRef, t2, 1)); + timeService.registerTimer(t3, new ValidatingTriggerable(errorRef, t3, 2)); + timeService.registerTimer(t4, new ValidatingTriggerable(errorRef, t4, 3)); long deadline = System.currentTimeMillis() + 20000; while (errorRef.get() == null && http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java index 4d4f07b..140e9e2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.hadoop.shaded.org.apache.http.impl.conn.SystemDefaultDnsResolver; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamMap; @@ -40,7 +39,6 @@ import java.util.List; import java.util.concurrent.Executors; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; @RunWith(PowerMockRunner.class) @PrepareForTest(ResultPartitionWriter.class) @@ -150,7 +148,7 @@ public class TimeProviderTest { } }); - Assert.assertTrue(provider.getNoOfRegisteredTimers() == 4); + Assert.assertEquals(provider.getNoOfRegisteredTimers(), 4); provider.setCurrentTime(100); long seen = 0; @@ -177,24 +175,24 @@ public class TimeProviderTest { testHarness.invoke(); - assertTrue(testHarness.getCurrentProcessingTime() == 0); + assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 0); tp.setCurrentTime(11); - assertTrue(testHarness.getCurrentProcessingTime() == 11); + assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 11); tp.setCurrentTime(15); tp.setCurrentTime(16); - assertTrue(testHarness.getCurrentProcessingTime() == 16); + assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 16); // register 2 tasks - mapTask.registerTimer(30, new Triggerable() { + mapTask.getTimerService().registerTimer(30, new Triggerable() { @Override public void trigger(long timestamp) { } }); - mapTask.registerTimer(40, new Triggerable() { + mapTask.getTimerService().registerTimer(40, new Triggerable() { @Override public void trigger(long timestamp) { http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 9849bd7..f33da89 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -39,10 +39,11 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; @@ -50,28 +51,19 @@ import org.apache.flink.util.Collector; import org.junit.After; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyLong; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -189,11 +181,15 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } @Test - public void testWindowTriggerTimeAlignment() { + public void testWindowTriggerTimeAlignment() throws Exception { + final Object lock = new Object(); + final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); + try { @SuppressWarnings("unchecked") final Output<StreamRecord<String>> mockOut = mock(Output.class); - final StreamTask<?, ?> mockTask = createMockTask(); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); AccumulatingProcessingTimeWindowOperator<String, String, String> op; @@ -233,16 +229,21 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { e.printStackTrace(); fail(e.getMessage()); } + finally { + timerService.shutdownService(); + } } @Test - public void testTumblingWindow() { - final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); + public void testTumblingWindow() throws Exception { + final Object lock = new Object(); + final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); + try { final int windowSize = 50; final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize); - final Object lock = new Object(); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); // tumbling window that triggers every 20 milliseconds AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = @@ -284,17 +285,19 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { fail(e.getMessage()); } finally { - timerService.shutdown(); + timerService.shutdownService(); } } @Test public void testSlidingWindow() throws Exception { - final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); + final Object lock = new Object(); + final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); + try { final CollectingOutput<Integer> out = new CollectingOutput<>(50); - final Object lock = new Object(); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); // tumbling window that triggers every 20 milliseconds AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = @@ -344,18 +347,19 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } } } finally { - timerService.shutdown(); + timerService.shutdownService(); } } @Test - public void testTumblingWindowSingleElements() { - final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); + public void testTumblingWindowSingleElements() throws Exception { + final Object lock = new Object(); + final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); try { final CollectingOutput<Integer> out = new CollectingOutput<>(50); - final Object lock = new Object(); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); // tumbling window that triggers every 20 milliseconds AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = @@ -400,18 +404,19 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { fail(e.getMessage()); } finally { - timerService.shutdown(); + timerService.shutdownService(); } } @Test - public void testSlidingWindowSingleElements() { - final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); + public void testSlidingWindowSingleElements() throws Exception { + final Object lock = new Object(); + final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); try { final CollectingOutput<Integer> out = new CollectingOutput<>(50); - final Object lock = new Object(); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); // tumbling window that triggers every 20 milliseconds AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = @@ -447,7 +452,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { fail(e.getMessage()); } finally { - timerService.shutdown(); + timerService.shutdownService(); } } @@ -494,8 +499,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { testHarness.processElement(new StreamRecord<>(i + numElementsFirst)); } + testHarness.close(); op.dispose(); - + // re-create the operator and restore the state op = new AccumulatingProcessingTimeWindowOperator<>( validatingIdentityFunction, identitySelector, @@ -527,6 +533,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { for (int i = 0; i < numElements; i++) { assertEquals(i, finalResult.get(i).intValue()); } + testHarness.close(); + op.dispose(); } catch (Exception e) { e.printStackTrace(); @@ -580,6 +588,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { testHarness.processElement(new StreamRecord<>(i)); } + testHarness.close(); op.dispose(); // re-create the operator and restore the state @@ -609,9 +618,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { timerService.setCurrentTime(300); timerService.setCurrentTime(350); - testHarness.close(); - op.dispose(); - // get and verify the result List<Integer> finalResult = new ArrayList<>(resultAtSnapshot); List<Integer> finalPartialResult = extractFromStreamRecords(testHarness.getOutput()); @@ -622,6 +628,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { for (int i = 0; i < factor * numElements; i++) { assertEquals(i / factor, finalResult.get(i).intValue()); } + + testHarness.close(); + op.dispose(); } catch (Exception e) { e.printStackTrace(); @@ -756,31 +765,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } private static StreamTask<?, ?> createMockTaskWithTimer( - final ScheduledExecutorService timerService, final Object lock) + final TimeServiceProvider timerService) { StreamTask<?, ?> mockTask = createMockTask(); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - final Long timestamp = (Long) invocationOnMock.getArguments()[0]; - final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1]; - timerService.schedule( - new Callable<Object>() { - @Override - public Object call() throws Exception { - synchronized (lock) { - target.trigger(timestamp); - } - return null; - } - }, - timestamp - System.currentTimeMillis(), - TimeUnit.MILLISECONDS); - return null; - } - }).when(mockTask).registerTimer(anyLong(), any(Triggerable.class)); - + when(mockTask.getTimerService()).thenReturn(timerService); return mockTask; } http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index 3dfa395..826b230 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -40,19 +40,17 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.After; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -61,19 +59,13 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -197,11 +189,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { } @Test - public void testWindowTriggerTimeAlignment() { + public void testWindowTriggerTimeAlignment() throws Exception { + final Object lock = new Object(); + final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); + try { @SuppressWarnings("unchecked") final Output<StreamRecord<String>> mockOut = mock(Output.class); - final StreamTask<?, ?> mockTask = createMockTask(); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); AggregatingProcessingTimeWindowOperator<String, String> op; @@ -240,12 +236,17 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); + } finally { + timerService.shutdownService(); } } @Test - public void testTumblingWindowUniqueElements() { - final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); + public void testTumblingWindowUniqueElements() throws Exception { + final Object lock = new Object(); + final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); + try { final int windowSize = 50; final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize); @@ -255,9 +256,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { sumFunction, fieldOneSelector, IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSize); - - final Object lock = new Object(); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); op.open(); @@ -296,20 +296,20 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { fail(e.getMessage()); } finally { - timerService.shutdownNow(); + timerService.shutdownService(); } } @Test - public void testTumblingWindowDuplicateElements() { - final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); - + public void testTumblingWindowDuplicateElements() throws Exception { + final Object lock = new Object(); + final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); try { final int windowSize = 50; final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize); - final Object lock = new Object(); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = new AggregatingProcessingTimeWindowOperator<>( @@ -364,18 +364,20 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { fail(e.getMessage()); } finally { - timerService.shutdown(); + timerService.shutdownService(); } } @Test - public void testSlidingWindow() { - final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); + public void testSlidingWindow() throws Exception { + final Object lock = new Object(); + final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); + try { final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50); - final Object lock = new Object(); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); // tumbling window that triggers every 20 milliseconds AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = @@ -434,18 +436,19 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { fail(e.getMessage()); } finally { - timerService.shutdownNow(); + timerService.shutdownService(); } } @Test - public void testSlidingWindowSingleElements() { - final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); + public void testSlidingWindowSingleElements() throws Exception { + final Object lock = new Object(); + final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); try { final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50); - final Object lock = new Object(); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); // tumbling window that triggers every 20 milliseconds AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = @@ -493,17 +496,19 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { fail(e.getMessage()); } finally { - timerService.shutdown(); + timerService.shutdownService(); } } @Test - public void testPropagateExceptionsFromProcessElement() { - final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); + public void testPropagateExceptionsFromProcessElement() throws Exception { + final Object lock = new Object(); + final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( + Executors.newSingleThreadScheduledExecutor(), lock); + try { final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(); - final Object lock = new Object(); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); ReduceFunction<Tuple2<Integer, Integer>> failingFunction = new FailingFunction(100); @@ -543,7 +548,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { fail(e.getMessage()); } finally { - timerService.shutdown(); + timerService.shutdownService(); } } @@ -593,6 +598,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { testHarness.processElement(next); } + testHarness.close(); op.dispose(); // re-create the operator and restore the state @@ -622,14 +628,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { finalResult.addAll(partialFinalResult); assertEquals(numElements, finalResult.size()); - testHarness.close(); - op.dispose(); - Collections.sort(finalResult, tupleComparator); for (int i = 0; i < numElements; i++) { assertEquals(i, finalResult.get(i).f0.intValue()); assertEquals(i, finalResult.get(i).f1.intValue()); } + + testHarness.close(); + op.dispose(); } catch (Exception e) { e.printStackTrace(); @@ -685,6 +691,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { testHarness.processElement(next); } + testHarness.close(); op.dispose(); // re-create the operator and restore the state @@ -715,9 +722,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { timerService.setCurrentTime(350); timerService.setCurrentTime(400); - testHarness.close(); - op.dispose(); - // get and verify the result List<Tuple2<Integer, Integer>> finalResult = new ArrayList<>(resultAtSnapshot); List<Tuple2<Integer, Integer>> partialFinalResult = extractFromStreamRecords(testHarness.getOutput()); @@ -729,6 +733,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { assertEquals(i / factor, finalResult.get(i).f0.intValue()); assertEquals(i / factor, finalResult.get(i).f1.intValue()); } + + testHarness.close(); + op.dispose(); } catch (Exception e) { e.printStackTrace(); @@ -837,14 +844,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { timerService.setCurrentTime(150); timerService.setCurrentTime(200); - testHarness.close(); - int count1 = StatefulFunction.globalCounts.get(1); int count2 = StatefulFunction.globalCounts.get(2); assertTrue(count1 >= 2 && count1 <= 2 * numElements); assertEquals(count1, count2); - + + testHarness.close(); op.dispose(); } catch (Exception e) { @@ -941,32 +947,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { return task; } - private static StreamTask<?, ?> createMockTaskWithTimer( - final ScheduledExecutorService timerService, final Object lock) + private static StreamTask<?, ?> createMockTaskWithTimer(final TimeServiceProvider timerService) { StreamTask<?, ?> mockTask = createMockTask(); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - final Long timestamp = (Long) invocationOnMock.getArguments()[0]; - final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1]; - timerService.schedule( - new Callable<Object>() { - @Override - public Object call() throws Exception { - synchronized (lock) { - target.trigger(timestamp); - } - return null; - } - }, - timestamp - System.currentTimeMillis(), - TimeUnit.MILLISECONDS); - return null; - } - }).when(mockTask).registerTimer(anyLong(), any(Triggerable.class)); - + when(mockTask.getTimerService()).thenReturn(timerService); return mockTask; } http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index ce634f0..cbb5a9d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -113,11 +113,11 @@ public class StreamTaskTestHarness<OUT> { outputStreamRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outputSerializer); } - public long getCurrentProcessingTime() { + public TimeServiceProvider getTimerService() { if (!(task instanceof StreamTask)) { - throw new UnsupportedOperationException("getCurrentProcessingTime() only supported on StreamTasks."); + throw new UnsupportedOperationException("getTimerService() only supported on StreamTasks."); } - return ((StreamTask) task).getCurrentProcessingTime(); + return ((StreamTask) task).getTimerService(); } /** @@ -235,9 +235,6 @@ public class StreamTaskTestHarness<OUT> { } else { if (taskThread.task instanceof StreamTask) { - long base = System.currentTimeMillis(); - long now = 0; - StreamTask<?, ?> streamTask = (StreamTask<?, ?>) taskThread.task; while (!streamTask.isRunning()) { Thread.sleep(100); http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index 7e86da0..430c6de 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java index 65ed43d..2dd2163 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java @@ -21,10 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; @@ -34,15 +31,10 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; +import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -105,7 +97,7 @@ public class MockContext<IN, OUT> { } private static StreamTask<?, ?> createMockTaskWithTimer( - final ScheduledExecutorService timerService, final Object lock) + final TimeServiceProvider timerService, final Object lock) { StreamTask<?, ?> task = mock(StreamTask.class); when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>()); @@ -113,28 +105,7 @@ public class MockContext<IN, OUT> { when(task.getExecutionConfig()).thenReturn(new ExecutionConfig()); when(task.getEnvironment()).thenReturn(new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024)); when(task.getCheckpointLock()).thenReturn(lock); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - final Long timestamp = (Long) invocationOnMock.getArguments()[0]; - final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1]; - timerService.schedule( - new Callable<Object>() { - @Override - public Object call() throws Exception { - synchronized (lock) { - target.trigger(timestamp); - } - return null; - } - }, - timestamp - System.currentTimeMillis(), - TimeUnit.MILLISECONDS); - return null; - } - }).when(task).registerTimer(anyLong(), any(Triggerable.class)); - + when(task.getTimerService()).thenReturn(timerService); return task; } } http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 6c637bf..9cdc783 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.AsynchronousException; import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; @@ -50,7 +49,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -155,23 +153,12 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { timeServiceProvider = testTimeProvider != null ? testTimeProvider : DefaultTimeServiceProvider.create(mockTask, Executors.newSingleThreadScheduledExecutor(), this.checkpointLock); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - final long execTime = (Long) invocation.getArguments()[0]; - final Triggerable target = (Triggerable) invocation.getArguments()[1]; - - timeServiceProvider.registerTimer(execTime, target); - return null; - } - }).when(mockTask).registerTimer(anyLong(), any(Triggerable.class)); - - doAnswer(new Answer<Long>() { + doAnswer(new Answer<TimeServiceProvider>() { @Override - public Long answer(InvocationOnMock invocation) throws Throwable { - return timeServiceProvider.getCurrentProcessingTime(); + public TimeServiceProvider answer(InvocationOnMock invocation) throws Throwable { + return timeServiceProvider; } - }).when(mockTask).getCurrentProcessingTime(); + }).when(mockTask).getTimerService(); } public void setStateBackend(AbstractStateBackend stateBackend) { http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java index 33c8024..707ce0f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java @@ -192,7 +192,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase { } if (first) { - registerTimer(System.currentTimeMillis() + 100, this); + getTimerService().registerTimer(System.currentTimeMillis() + 100, this); first = false; } numElements++; @@ -209,7 +209,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase { try { numTimers++; throwIfDone(); - registerTimer(System.currentTimeMillis() + 1, this); + getTimerService().registerTimer(System.currentTimeMillis() + 1, this); } finally { semaphore.release(); } @@ -251,7 +251,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase { } if (first) { - registerTimer(System.currentTimeMillis() + 100, this); + getTimerService().registerTimer(System.currentTimeMillis() + 100, this); first = false; } numElements++; @@ -266,7 +266,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase { } if (first) { - registerTimer(System.currentTimeMillis() + 100, this); + getTimerService().registerTimer(System.currentTimeMillis() + 100, this); first = false; } numElements++; @@ -284,7 +284,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase { try { numTimers++; throwIfDone(); - registerTimer(System.currentTimeMillis() + 1, this); + getTimerService().registerTimer(System.currentTimeMillis() + 1, this); } finally { semaphore.release(); }