[FLINK-2675] [streaming] Add utilities for scheduled triggers.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ad21031 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ad21031 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ad21031 Branch: refs/heads/master Commit: 7ad210316fe22441761351e9943cdc31570d5407 Parents: 64e1dc6 Author: Stephan Ewen <se...@apache.org> Authored: Tue Sep 15 14:55:41 2015 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Sep 22 13:25:20 2015 +0200 ---------------------------------------------------------------------- .../taskmanager/DispatcherThreadFactory.java | 50 +++++++ .../taskmanager/DispatherThreadFactory.java | 50 ------- .../apache/flink/runtime/taskmanager/Task.java | 2 +- .../runtime/operators/TriggerTimer.java | 119 ++++++++++++++++ .../runtime/operators/Triggerable.java | 38 +++++ .../runtime/operators/package-info.java | 22 +++ .../runtime/operators/TriggerTimerTest.java | 137 +++++++++++++++++++ 7 files changed, 367 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7ad21031/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java new file mode 100644 index 0000000..97060a8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import java.util.concurrent.ThreadFactory; + +/** + * Thread factory that creates threads with a given name, associates them with a given + * thread group, and set them to daemon mode. + */ +public class DispatcherThreadFactory implements ThreadFactory { + + private final ThreadGroup group; + + private final String threadName; + + /** + * Creates a new thread factory. + * + * @param group The group that the threads will be associated with. + * @param threadName The name for the threads. + */ + public DispatcherThreadFactory(ThreadGroup group, String threadName) { + this.group = group; + this.threadName = threadName; + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, threadName); + t.setDaemon(true); + return t; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ad21031/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatherThreadFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatherThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatherThreadFactory.java deleted file mode 100644 index f5f1565..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatherThreadFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import java.util.concurrent.ThreadFactory; - -/** - * Thread factory that creates threads with a given name, associates them with a given - * thread group, and set them to daemon mode. - */ -public class DispatherThreadFactory implements ThreadFactory { - - private final ThreadGroup group; - - private final String threadName; - - /** - * Creates a new thread factory. - * - * @param group The group that the threads will be associated with. - * @param threadName The name for the threads. - */ - public DispatherThreadFactory(ThreadGroup group, String threadName) { - this.group = group; - this.threadName = threadName; - } - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(group, r, threadName); - t.setDaemon(true); - return t; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ad21031/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 7eff45a..434c5d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -998,7 +998,7 @@ public class Task implements Runnable { if (executor == null) { // first time use, initialize executor = Executors.newSingleThreadExecutor( - new DispatherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + taskNameWithSubtask)); + new DispatcherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + taskNameWithSubtask)); this.asyncCallDispatcher = executor; // double-check for execution state, and make sure we clean up after ourselves http://git-wip-us.apache.org/repos/asf/flink/blob/7ad21031/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java new file mode 100644 index 0000000..7528eb3 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A timer that triggers targets at a specific point in the future. This timer executes single-threaded, + * which means that never more than one trigger will be executed at the same time. + * <p> + * This timer generally maintains order of trigger events. This means that for two triggers scheduled at + * different times, the one scheduled for the later time will be executed after the one scheduled for the + * earlier time. + */ +public class TriggerTimer { + + /** The thread group that holds all trigger timer threads */ + public static final ThreadGroup TRIGGER_THREADS_GROUP = new ThreadGroup("Triggers"); + + /** The executor service that */ + private final ScheduledExecutorService scheduler; + + + /** + * Creates a new trigger timer, where the timer thread has the default name "TriggerTimer Thread". + */ + public TriggerTimer() { + this("TriggerTimer Thread"); + } + + /** + * Creates a new trigger timer, where the timer thread has the given name. + * + * @param triggerName The name for the trigger thread. + */ + public TriggerTimer(String triggerName) { + this.scheduler = Executors.newSingleThreadScheduledExecutor( + new DispatcherThreadFactory(TRIGGER_THREADS_GROUP, triggerName)); + } + + /** + * Schedules a new trigger event. The trigger event will occur roughly at the given timestamp. + * If the timestamp is in the past (or now), the trigger will be queued for immediate execution. Note that other + * triggers that are to be executed now will be executed before this trigger. + * + * @param target The target to be triggered. + * @param timestamp The timestamp when the trigger should occur, and the timestamp given + * to the trigger-able target. + */ + public void scheduleTriggerAt(Triggerable target, long timestamp) { + long delay = Math.max(timestamp - System.currentTimeMillis(), 0); + + scheduler.schedule( + new TriggerTask(target, timestamp), + delay, + TimeUnit.MILLISECONDS); + } + + /** + * Shuts down the trigger timer, canceling all pending triggers and stopping the trigger thread. + */ + public void shutdown() { + scheduler.shutdownNow(); + } + + /** + * The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original + * shutdown method was never called. + * <p> + * This should not be relied upon! It will cause shutdown to happen much later than if manual + * shutdown is attempted, and cause threads to linger for longer than needed. + */ + @Override + @SuppressWarnings("FinalizeDoesntCallSuperFinalize") + protected void finalize() { + shutdown(); + } + + // ------------------------------------------------------------------------ + + /** + * Internal task that is invoked by the scheduled executor and triggers the target. + */ + private static final class TriggerTask implements Runnable { + + private final Triggerable target; + private final long timestamp; + + TriggerTask(Triggerable target, long timestamp) { + this.target = target; + this.timestamp = timestamp; + } + + @Override + public void run() { + target.trigger(timestamp); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ad21031/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java new file mode 100644 index 0000000..626f087 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +/** + * This interface must be implemented by objects that are triggered by a + * {@link TriggerTimer}. + */ +public interface Triggerable { + + /** + * This method is invoked by the {@link TriggerTimer} + * and given the timestamp for which the trigger was scheduled. + * <p> + * If the triggering is delayed for whatever reason (trigger timer was blocked, JVM stalled due + * to a garbage collection), the timestamp supplied to this function will still be the original + * timestamp for which the trigger was scheduled. + * + * @param timestamp The timestamp for which the trigger event was scheduled. + */ + void trigger(long timestamp); +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ad21031/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/package-info.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/package-info.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/package-info.java new file mode 100644 index 0000000..5fe6873 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains the operators that perform the stream transformations. + * One or more operators are bundled into a "chain" and executed in a stream task. + */ +package org.apache.flink.streaming.runtime.operators; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/7ad21031/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/TriggerTimerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/TriggerTimerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/TriggerTimerTest.java new file mode 100644 index 0000000..1349cb3 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/TriggerTimerTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; + +public class TriggerTimerTest { + + @Test + public void testThreadGroupAndShutdown() { + try { + TriggerTimer timer = new TriggerTimer(); + + // first one spawns thread + timer.scheduleTriggerAt(new Triggerable() { + @Override + public void trigger(long timestamp) {} + }, System.currentTimeMillis()); + + assertEquals(1, TriggerTimer.TRIGGER_THREADS_GROUP.activeCount()); + + // thread needs to die in time + timer.shutdown(); + + long deadline = System.currentTimeMillis() + 2000; + while (TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(10); + } + + assertEquals("Trigger timer thread did not properly shut down", + 0, TriggerTimer.TRIGGER_THREADS_GROUP.activeCount()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void checkScheduledTimestampe() { + try { + final TriggerTimer timer = new TriggerTimer(); + + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + + final long t1 = System.currentTimeMillis(); + final long t2 = System.currentTimeMillis() - 200; + final long t3 = System.currentTimeMillis() + 100; + final long t4 = System.currentTimeMillis() + 200; + + timer.scheduleTriggerAt(new ValidatingTriggerable(errorRef, t1, 0), t1); + timer.scheduleTriggerAt(new ValidatingTriggerable(errorRef, t2, 1), t2); + timer.scheduleTriggerAt(new ValidatingTriggerable(errorRef, t3, 2), t3); + timer.scheduleTriggerAt(new ValidatingTriggerable(errorRef, t4, 3), t4); + + long deadline = System.currentTimeMillis() + 5000; + while (errorRef.get() == null && + ValidatingTriggerable.numInSequence < 4 && + System.currentTimeMillis() < deadline) + { + Thread.sleep(100); + } + + // handle errors + if (errorRef.get() != null) { + errorRef.get().printStackTrace(); + fail(errorRef.get().getMessage()); + } + + assertEquals(4, ValidatingTriggerable.numInSequence); + + timer.shutdown(); + + // wait until the trigger thread is shut down. otherwise, the other tests may become unstable + deadline = System.currentTimeMillis() + 2000; + while (TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(10); + } + + assertEquals("Trigger timer thread did not properly shut down", + 0, TriggerTimer.TRIGGER_THREADS_GROUP.activeCount()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + + private static class ValidatingTriggerable implements Triggerable { + + static int numInSequence; + + private final AtomicReference<Throwable> errorRef; + + private final long expectedTimestamp; + private final int expectedInSequence; + + private ValidatingTriggerable(AtomicReference<Throwable> errorRef, long expectedTimestamp, int expectedInSequence) { + this.errorRef = errorRef; + this.expectedTimestamp = expectedTimestamp; + this.expectedInSequence = expectedInSequence; + } + + @Override + public void trigger(long timestamp) { + try { + assertEquals(expectedTimestamp, timestamp); + assertEquals(expectedInSequence, numInSequence); + numInSequence++; + } + catch (Throwable t) { + errorRef.compareAndSet(null, t); + } + } + } +}