Repository: flink Updated Branches: refs/heads/master 40ba6261b -> b32b8359e
[FLINK-8385] [checkpointing] Avoid RejectedExecutionException in SharedStateRegistry during disposal from async Zookeeper calls. This closes #5256. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b32b8359 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b32b8359 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b32b8359 Branch: refs/heads/master Commit: b32b8359ea20812cddbcbffc3b617d1256889cb1 Parents: b5de38c Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Mon Jan 8 11:31:57 2018 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Tue Jan 9 15:17:34 2018 +0100 ---------------------------------------------------------------------- .../flink/runtime/state/SharedStateRegistry.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b32b8359/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java index 458c695..664631b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; /** * This registry manages state that is shared across (incremental) checkpoints, and is responsible @@ -194,8 +195,17 @@ public class SharedStateRegistry implements AutoCloseable { // We do the small optimization to not issue discards for placeholders, which are NOPs. if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) { LOG.trace("Scheduled delete of state handle {}.", streamStateHandle); - asyncDisposalExecutor.execute( - new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle)); + AsyncDisposalRunnable asyncDisposalRunnable = new AsyncDisposalRunnable(streamStateHandle); + try { + asyncDisposalExecutor.execute(asyncDisposalRunnable); + } catch (RejectedExecutionException ex) { + // TODO This is a temporary fix for a problem during ZooKeeperCompletedCheckpointStore#shutdown: + // Disposal is issued in another async thread and the shutdown proceeds to close the I/O Executor pool. + // This leads to RejectedExecutionException once the async deletes are triggered by ZK. We need to + // wait for all pending ZK deletes before closing the I/O Executor pool. We can simply call #run() + // because we are already in the async ZK thread that disposes the handles. + asyncDisposalRunnable.run(); + } } }