[FLINK-2675] [streaming] Add utilities for scheduled triggers.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ad21031
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ad21031
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ad21031

Branch: refs/heads/master
Commit: 7ad210316fe22441761351e9943cdc31570d5407
Parents: 64e1dc6
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 15 14:55:41 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 22 13:25:20 2015 +0200

----------------------------------------------------------------------
 .../taskmanager/DispatcherThreadFactory.java    |  50 +++++++
 .../taskmanager/DispatherThreadFactory.java     |  50 -------
 .../apache/flink/runtime/taskmanager/Task.java  |   2 +-
 .../runtime/operators/TriggerTimer.java         | 119 ++++++++++++++++
 .../runtime/operators/Triggerable.java          |  38 +++++
 .../runtime/operators/package-info.java         |  22 +++
 .../runtime/operators/TriggerTimerTest.java     | 137 +++++++++++++++++++
 7 files changed, 367 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ad21031/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java
new file mode 100644
index 0000000..97060a8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Thread factory that creates threads with a given name, associates them with 
a given
+ * thread group, and set them to daemon mode.
+ */
+public class DispatcherThreadFactory implements ThreadFactory {
+       
+       private final ThreadGroup group;
+       
+       private final String threadName;
+       
+       /**
+        * Creates a new thread factory.
+        * 
+        * @param group The group that the threads will be associated with.
+        * @param threadName The name for the threads.
+        */
+       public DispatcherThreadFactory(ThreadGroup group, String threadName) {
+               this.group = group;
+               this.threadName = threadName;
+       }
+
+       @Override
+       public Thread newThread(Runnable r) {
+               Thread t = new Thread(group, r, threadName);
+               t.setDaemon(true);
+               return t;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad21031/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatherThreadFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatherThreadFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatherThreadFactory.java
deleted file mode 100644
index f5f1565..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatherThreadFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import java.util.concurrent.ThreadFactory;
-
-/**
- * Thread factory that creates threads with a given name, associates them with 
a given
- * thread group, and set them to daemon mode.
- */
-public class DispatherThreadFactory implements ThreadFactory {
-       
-       private final ThreadGroup group;
-       
-       private final String threadName;
-       
-       /**
-        * Creates a new thread factory.
-        * 
-        * @param group The group that the threads will be associated with.
-        * @param threadName The name for the threads.
-        */
-       public DispatherThreadFactory(ThreadGroup group, String threadName) {
-               this.group = group;
-               this.threadName = threadName;
-       }
-
-       @Override
-       public Thread newThread(Runnable r) {
-               Thread t = new Thread(group, r, threadName);
-               t.setDaemon(true);
-               return t;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad21031/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 7eff45a..434c5d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -998,7 +998,7 @@ public class Task implements Runnable {
                        if (executor == null) {
                                // first time use, initialize
                                executor = Executors.newSingleThreadExecutor(
-                                               new 
DispatherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + 
taskNameWithSubtask));
+                                               new 
DispatcherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + 
taskNameWithSubtask));
                                this.asyncCallDispatcher = executor;
                                
                                // double-check for execution state, and make 
sure we clean up after ourselves

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad21031/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java
new file mode 100644
index 0000000..7528eb3
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A timer that triggers targets at a specific point in the future. This timer 
executes single-threaded,
+ * which means that never more than one trigger will be executed at the same 
time.
+ * <p>
+ * This timer generally maintains order of trigger events. This means that for 
two triggers scheduled at
+ * different times, the one scheduled for the later time will be executed 
after the one scheduled for the
+ * earlier time.
+ */
+public class TriggerTimer {
+       
+       /** The thread group that holds all trigger timer threads */
+       public static final ThreadGroup TRIGGER_THREADS_GROUP = new 
ThreadGroup("Triggers");
+       
+       /** The executor service that */
+       private final ScheduledExecutorService scheduler;
+
+
+       /**
+        * Creates a new trigger timer, where the timer thread has the default 
name "TriggerTimer Thread".
+        */
+       public TriggerTimer() {
+               this("TriggerTimer Thread");
+       }
+
+       /**
+        * Creates a new trigger timer, where the timer thread has the given 
name.
+        * 
+        * @param triggerName The name for the trigger thread.
+        */
+       public TriggerTimer(String triggerName) {
+               this.scheduler = Executors.newSingleThreadScheduledExecutor(
+                               new 
DispatcherThreadFactory(TRIGGER_THREADS_GROUP, triggerName));
+       }
+
+       /**
+        * Schedules a new trigger event. The trigger event will occur roughly 
at the given timestamp.
+        * If the timestamp is in the past (or now), the trigger will be queued 
for immediate execution. Note that other
+        * triggers that are to be executed now will be executed before this 
trigger.
+        * 
+        * @param target The target to be triggered.
+        * @param timestamp The timestamp when the trigger should occur, and 
the timestamp given
+        *                  to the trigger-able target.
+        */
+       public void scheduleTriggerAt(Triggerable target, long timestamp) {
+               long delay = Math.max(timestamp - System.currentTimeMillis(), 
0);
+               
+               scheduler.schedule(
+                               new TriggerTask(target, timestamp),
+                               delay,
+                               TimeUnit.MILLISECONDS);
+       }
+
+       /**
+        * Shuts down the trigger timer, canceling all pending triggers and 
stopping the trigger thread.
+        */
+       public void shutdown() {
+               scheduler.shutdownNow();
+       }
+
+       /**
+        * The finalize method shuts down the timer. This is a fail-safe 
shutdown, in case the original
+        * shutdown method was never called.
+        * <p>
+        * This should not be relied upon! It will cause shutdown to happen 
much later than if manual
+        * shutdown is attempted, and cause threads to linger for longer than 
needed.
+        */
+       @Override
+       @SuppressWarnings("FinalizeDoesntCallSuperFinalize")
+       protected void finalize() {
+               shutdown();
+       }
+       
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Internal task that is invoked by the scheduled executor and triggers 
the target.
+        */
+       private static final class TriggerTask implements Runnable {
+               
+               private final Triggerable target;
+               private final long timestamp;
+
+               TriggerTask(Triggerable target, long timestamp) {
+                       this.target = target;
+                       this.timestamp = timestamp;
+               }
+
+               @Override
+               public void run() {
+                       target.trigger(timestamp);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad21031/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
new file mode 100644
index 0000000..626f087
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+/**
+ * This interface must be implemented by objects that are triggered by a
+ * {@link TriggerTimer}.
+ */
+public interface Triggerable {
+
+       /**
+        * This method is invoked by the {@link TriggerTimer}
+        * and given the timestamp for which the trigger was scheduled.
+        * <p>
+        * If the triggering is delayed for whatever reason (trigger timer was 
blocked, JVM stalled due
+        * to a garbage collection), the timestamp supplied to this function 
will still be the original
+        * timestamp for which the trigger was scheduled.
+        * 
+        * @param timestamp The timestamp for which the trigger event was 
scheduled.
+        */
+       void trigger(long timestamp);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad21031/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/package-info.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/package-info.java
new file mode 100644
index 0000000..5fe6873
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the operators that perform the stream transformations.
+ * One or more operators are bundled into a "chain" and executed in a stream 
task. 
+ */
+package org.apache.flink.streaming.runtime.operators;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad21031/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/TriggerTimerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/TriggerTimerTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/TriggerTimerTest.java
new file mode 100644
index 0000000..1349cb3
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/TriggerTimerTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+
+public class TriggerTimerTest {
+       
+       @Test
+       public void testThreadGroupAndShutdown() {
+               try {
+                       TriggerTimer timer = new TriggerTimer();
+                       
+                       // first one spawns thread
+                       timer.scheduleTriggerAt(new Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) {}
+                       }, System.currentTimeMillis());
+                       
+                       assertEquals(1, 
TriggerTimer.TRIGGER_THREADS_GROUP.activeCount());
+                       
+                       // thread needs to die in time
+                       timer.shutdown();
+                       
+                       long deadline = System.currentTimeMillis() + 2000;
+                       while (TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() 
> 0 && System.currentTimeMillis() < deadline) {
+                               Thread.sleep(10);
+                       }
+
+                       assertEquals("Trigger timer thread did not properly 
shut down",
+                                       0, 
TriggerTimer.TRIGGER_THREADS_GROUP.activeCount());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void checkScheduledTimestampe() {
+               try {
+                       final TriggerTimer timer = new TriggerTimer();
+
+                       final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+                       
+                       final long t1 = System.currentTimeMillis();
+                       final long t2 = System.currentTimeMillis() - 200;
+                       final long t3 = System.currentTimeMillis() + 100;
+                       final long t4 = System.currentTimeMillis() + 200;
+                       
+                       timer.scheduleTriggerAt(new 
ValidatingTriggerable(errorRef, t1, 0), t1);
+                       timer.scheduleTriggerAt(new 
ValidatingTriggerable(errorRef, t2, 1), t2);
+                       timer.scheduleTriggerAt(new 
ValidatingTriggerable(errorRef, t3, 2), t3);
+                       timer.scheduleTriggerAt(new 
ValidatingTriggerable(errorRef, t4, 3), t4);
+                       
+                       long deadline = System.currentTimeMillis() + 5000;
+                       while (errorRef.get() == null &&
+                                       ValidatingTriggerable.numInSequence < 4 
&&
+                                       System.currentTimeMillis() < deadline)
+                       {
+                               Thread.sleep(100);
+                       }
+                       
+                       // handle errors
+                       if (errorRef.get() != null) {
+                               errorRef.get().printStackTrace();
+                               fail(errorRef.get().getMessage());
+                       }
+                       
+                       assertEquals(4, ValidatingTriggerable.numInSequence);
+                       
+                       timer.shutdown();
+                       
+                       // wait until the trigger thread is shut down. 
otherwise, the other tests may become unstable
+                       deadline = System.currentTimeMillis() + 2000;
+                       while (TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() 
> 0 && System.currentTimeMillis() < deadline) {
+                               Thread.sleep(10);
+                       }
+
+                       assertEquals("Trigger timer thread did not properly 
shut down", 
+                                       0, 
TriggerTimer.TRIGGER_THREADS_GROUP.activeCount());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       
+       private static class ValidatingTriggerable implements Triggerable {
+               
+               static int numInSequence;
+               
+               private final AtomicReference<Throwable> errorRef;
+               
+               private final long expectedTimestamp;
+               private final int expectedInSequence;
+
+               private ValidatingTriggerable(AtomicReference<Throwable> 
errorRef, long expectedTimestamp, int expectedInSequence) {
+                       this.errorRef = errorRef;
+                       this.expectedTimestamp = expectedTimestamp;
+                       this.expectedInSequence = expectedInSequence;
+               }
+
+               @Override
+               public void trigger(long timestamp) {
+                       try {
+                               assertEquals(expectedTimestamp, timestamp);
+                               assertEquals(expectedInSequence, numInSequence);
+                               numInSequence++;
+                       }
+                       catch (Throwable t) {
+                               errorRef.compareAndSet(null, t);
+                       }
+               }
+       }
+}

Reply via email to