Repository: flink Updated Branches: refs/heads/release-1.4 e100861f8 -> 828ef09b0
[FLINK-5465] [streaming] Wait for pending timer threads to finish or to exceed a time limit in exceptional stream task shutdown. This closes #5058. (cherry picked from commit d86c6b6) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/828ef09b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/828ef09b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/828ef09b Branch: refs/heads/release-1.4 Commit: 828ef09b09f872107b412501774b42efaf6caa37 Parents: e100861 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Wed Nov 22 17:52:35 2017 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Thu Nov 23 15:25:01 2017 +0100 ---------------------------------------------------------------------- .../configuration/TimerServiceOptions.java | 38 ++++++++++++ .../runtime/tasks/ProcessingTimeService.java | 12 ++++ .../streaming/runtime/tasks/StreamTask.java | 18 +++++- .../tasks/SystemProcessingTimeService.java | 6 ++ .../tasks/TestProcessingTimeService.java | 6 ++ .../tasks/SystemProcessingTimeServiceTest.java | 65 ++++++++++++++++++++ 6 files changed, 142 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/828ef09b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java new file mode 100644 index 0000000..835adce --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.configuration; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** + * Timer service configuration options. + */ +@PublicEvolving +public class TimerServiceOptions { + + /** + * This configures how long we wait for the {@link org.apache.flink.streaming.runtime.tasks.ProcessingTimeService} + * to finish all pending timer threads when the stream task performs a failover shutdown. See FLINK-5465. + */ + public static final ConfigOption<Long> TIMER_SERVICE_TERMINATION_AWAIT_MS = ConfigOptions + .key("timerservice.exceptional.shutdown.timeout") + .defaultValue(7500L); +} http://git-wip-us.apache.org/repos/asf/flink/blob/828ef09b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java index b238252..2516299 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.tasks; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * Defines the current processing time and handles all related actions, @@ -93,4 +94,15 @@ public abstract class ProcessingTimeService { * will result in a hard exception. */ public abstract void shutdownService(); + + /** + * Shuts down and clean up the timer service provider hard and immediately. This does wait + * for all timers to complete or until the time limit is exceeded. Any call to + * {@link #registerTimer(long, ProcessingTimeCallback)} will result in a hard exception after calling this method. + * @param time time to wait for termination. + * @param timeUnit time unit of parameter time. + * @return {@code true} if this timer service and all pending timers are terminated and + * {@code false} if the timeout elapsed before this happened. + */ + public abstract boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException; } http://git-wip-us.apache.org/repos/asf/flink/blob/828ef09b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 68f590e..6ae45c6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -51,6 +51,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OperatorSnapshotResult; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.configuration.TimerServiceOptions; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; @@ -69,6 +70,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -212,7 +214,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> LOG.debug("Initializing {}.", getName()); asyncOperationsThreadPool = Executors.newCachedThreadPool(); - configuration = new StreamConfig(getTaskConfiguration()); stateBackend = createStateBackend(); @@ -305,9 +306,20 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> isRunning = false; // stop all timers and threads - if (timerService != null) { + if (timerService != null && !timerService.isTerminated()) { try { - timerService.shutdownService(); + + final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration(). + getLong(TimerServiceOptions.TIMER_SERVICE_TERMINATION_AWAIT_MS); + + // wait for a reasonable time for all pending timer threads to finish + boolean timerShutdownComplete = + timerService.shutdownAndAwaitPending(timeoutMs, TimeUnit.MILLISECONDS); + + if (!timerShutdownComplete) { + LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " + + "timers. Will continue with shutdown procedure.", timeoutMs); + } } catch (Throwable t) { // catch and log the exception to not replace the original exception http://git-wip-us.apache.org/repos/asf/flink/blob/828ef09b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java index 71bfdf6..be8b23c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java @@ -191,6 +191,12 @@ public class SystemProcessingTimeService extends ProcessingTimeService { } } + @Override + public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException { + shutdownService(); + return timerService.awaitTermination(time, timeUnit); + } + // safety net to destroy the thread pool @Override protected void finalize() throws Throwable { http://git-wip-us.apache.org/repos/asf/flink/blob/828ef09b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java index 080eeb5..2081f19 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java @@ -134,6 +134,12 @@ public class TestProcessingTimeService extends ProcessingTimeService { this.isTerminated = true; } + @Override + public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException { + shutdownService(); + return true; + } + public int getNumActiveTimers() { int count = 0; http://git-wip-us.apache.org/repos/asf/flink/blob/828ef09b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java index 4c105d3..01fd778 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java @@ -22,11 +22,13 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; @@ -442,4 +444,67 @@ public class SystemProcessingTimeServiceTest extends TestLogger { latch.await(); assertTrue(exceptionWasThrown.get()); } + + @Test + public void testShutdownAndWaitPending() { + + final Object lock = new Object(); + final OneShotLatch waitUntilTimerStarted = new OneShotLatch(); + final OneShotLatch blockUntilTerminationInterrupts = new OneShotLatch(); + final OneShotLatch blockUntilTriggered = new OneShotLatch(); + final AtomicBoolean check = new AtomicBoolean(true); + + final SystemProcessingTimeService timeService = new SystemProcessingTimeService( + (message, exception) -> { + }, + lock); + + timeService.scheduleAtFixedRate( + timestamp -> { + + waitUntilTimerStarted.trigger(); + + try { + blockUntilTerminationInterrupts.await(); + check.set(false); + } catch (InterruptedException ignore) { + } + + try { + blockUntilTriggered.await(); + } catch (InterruptedException ignore) { + check.set(false); + } + }, + 0L, + 10L); + + try { + waitUntilTimerStarted.await(); + } catch (InterruptedException e) { + Assert.fail(); + } + + Assert.assertFalse(timeService.isTerminated()); + + // Check that we wait for the timer to terminate. As the timer blocks on the second latch, this should time out. + try { + Assert.assertFalse(timeService.shutdownAndAwaitPending(1, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Assert.fail("Unexpected interruption."); + } + + // Let the timer proceed. + blockUntilTriggered.trigger(); + + // Now we should succeed in terminating the timer. + try { + Assert.assertTrue(timeService.shutdownAndAwaitPending(60, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Assert.fail("Unexpected interruption."); + } + + Assert.assertTrue(check.get()); + Assert.assertTrue(timeService.isTerminated()); + } }