Repository: flink
Updated Branches:
  refs/heads/release-1.4 e100861f8 -> 828ef09b0


[FLINK-5465] [streaming] Wait for pending timer threads to finish or to exceed 
a time limit in exceptional stream task shutdown.

This closes #5058.

(cherry picked from commit d86c6b6)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/828ef09b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/828ef09b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/828ef09b

Branch: refs/heads/release-1.4
Commit: 828ef09b09f872107b412501774b42efaf6caa37
Parents: e100861
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Wed Nov 22 17:52:35 2017 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu Nov 23 15:25:01 2017 +0100

----------------------------------------------------------------------
 .../configuration/TimerServiceOptions.java      | 38 ++++++++++++
 .../runtime/tasks/ProcessingTimeService.java    | 12 ++++
 .../streaming/runtime/tasks/StreamTask.java     | 18 +++++-
 .../tasks/SystemProcessingTimeService.java      |  6 ++
 .../tasks/TestProcessingTimeService.java        |  6 ++
 .../tasks/SystemProcessingTimeServiceTest.java  | 65 ++++++++++++++++++++
 6 files changed, 142 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/828ef09b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
new file mode 100644
index 0000000..835adce
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Timer service configuration options.
+ */
+@PublicEvolving
+public class TimerServiceOptions {
+
+       /**
+        * This configures how long we wait for the {@link 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService}
+        * to finish all pending timer threads when the stream task performs a 
failover shutdown. See FLINK-5465.
+        */
+       public static final ConfigOption<Long> 
TIMER_SERVICE_TERMINATION_AWAIT_MS = ConfigOptions
+               .key("timerservice.exceptional.shutdown.timeout")
+               .defaultValue(7500L);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/828ef09b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index b238252..2516299 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Defines the current processing time and handles all related actions,
@@ -93,4 +94,15 @@ public abstract class ProcessingTimeService {
         * will result in a hard exception.
         */
        public abstract void shutdownService();
+
+       /**
+        * Shuts down and clean up the timer service provider hard and 
immediately. This does wait
+        * for all timers to complete or until the time limit is exceeded. Any 
call to
+        * {@link #registerTimer(long, ProcessingTimeCallback)} will result in 
a hard exception after calling this method.
+        * @param time time to wait for termination.
+        * @param timeUnit time unit of parameter time.
+        * @return {@code true} if this timer service and all pending timers 
are terminated and
+        *         {@code false} if the timeout elapsed before this happened.
+        */
+       public abstract boolean shutdownAndAwaitPending(long time, TimeUnit 
timeUnit) throws InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/828ef09b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 68f590e..6ae45c6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -51,6 +51,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.configuration.TimerServiceOptions;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
@@ -69,6 +70,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -212,7 +214,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        LOG.debug("Initializing {}.", getName());
 
                        asyncOperationsThreadPool = 
Executors.newCachedThreadPool();
-
                        configuration = new 
StreamConfig(getTaskConfiguration());
 
                        stateBackend = createStateBackend();
@@ -305,9 +306,20 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        isRunning = false;
 
                        // stop all timers and threads
-                       if (timerService != null) {
+                       if (timerService != null && 
!timerService.isTerminated()) {
                                try {
-                                       timerService.shutdownService();
+
+                                       final long timeoutMs = 
getEnvironment().getTaskManagerInfo().getConfiguration().
+                                               
getLong(TimerServiceOptions.TIMER_SERVICE_TERMINATION_AWAIT_MS);
+
+                                       // wait for a reasonable time for all 
pending timer threads to finish
+                                       boolean timerShutdownComplete =
+                                               
timerService.shutdownAndAwaitPending(timeoutMs, TimeUnit.MILLISECONDS);
+
+                                       if (!timerShutdownComplete) {
+                                               LOG.warn("Timer service 
shutdown exceeded time limit of {} ms while waiting for pending " +
+                                                       "timers. Will continue 
with shutdown procedure.", timeoutMs);
+                                       }
                                }
                                catch (Throwable t) {
                                        // catch and log the exception to not 
replace the original exception

http://git-wip-us.apache.org/repos/asf/flink/blob/828ef09b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index 71bfdf6..be8b23c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -191,6 +191,12 @@ public class SystemProcessingTimeService extends 
ProcessingTimeService {
                }
        }
 
+       @Override
+       public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) 
throws InterruptedException {
+               shutdownService();
+               return timerService.awaitTermination(time, timeUnit);
+       }
+
        // safety net to destroy the thread pool
        @Override
        protected void finalize() throws Throwable {

http://git-wip-us.apache.org/repos/asf/flink/blob/828ef09b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 080eeb5..2081f19 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -134,6 +134,12 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
                this.isTerminated = true;
        }
 
+       @Override
+       public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) 
throws InterruptedException {
+               shutdownService();
+               return true;
+       }
+
        public int getNumActiveTimers() {
                int count = 0;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/828ef09b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 4c105d3..01fd778 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -22,11 +22,13 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import 
org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
@@ -442,4 +444,67 @@ public class SystemProcessingTimeServiceTest extends 
TestLogger {
                latch.await();
                assertTrue(exceptionWasThrown.get());
        }
+
+       @Test
+       public void testShutdownAndWaitPending() {
+
+               final Object lock = new Object();
+               final OneShotLatch waitUntilTimerStarted = new OneShotLatch();
+               final OneShotLatch blockUntilTerminationInterrupts = new 
OneShotLatch();
+               final OneShotLatch blockUntilTriggered = new OneShotLatch();
+               final AtomicBoolean check = new AtomicBoolean(true);
+
+               final SystemProcessingTimeService timeService = new 
SystemProcessingTimeService(
+                       (message, exception) -> {
+                       },
+                       lock);
+
+               timeService.scheduleAtFixedRate(
+                       timestamp -> {
+
+                               waitUntilTimerStarted.trigger();
+
+                               try {
+                                       blockUntilTerminationInterrupts.await();
+                                       check.set(false);
+                               } catch (InterruptedException ignore) {
+                               }
+
+                               try {
+                                       blockUntilTriggered.await();
+                               } catch (InterruptedException ignore) {
+                                       check.set(false);
+                               }
+                       },
+                       0L,
+                       10L);
+
+               try {
+                       waitUntilTimerStarted.await();
+               } catch (InterruptedException e) {
+                       Assert.fail();
+               }
+
+               Assert.assertFalse(timeService.isTerminated());
+
+               // Check that we wait for the timer to terminate. As the timer 
blocks on the second latch, this should time out.
+               try {
+                       
Assert.assertFalse(timeService.shutdownAndAwaitPending(1, TimeUnit.SECONDS));
+               } catch (InterruptedException e) {
+                       Assert.fail("Unexpected interruption.");
+               }
+
+               // Let the timer proceed.
+               blockUntilTriggered.trigger();
+
+               // Now we should succeed in terminating the timer.
+               try {
+                       
Assert.assertTrue(timeService.shutdownAndAwaitPending(60, TimeUnit.SECONDS));
+               } catch (InterruptedException e) {
+                       Assert.fail("Unexpected interruption.");
+               }
+
+               Assert.assertTrue(check.get());
+               Assert.assertTrue(timeService.isTerminated());
+       }
 }

Reply via email to