Repository: flink
Updated Branches:
  refs/heads/master 40ba6261b -> b32b8359e


[FLINK-8385] [checkpointing] Avoid RejectedExecutionException in 
SharedStateRegistry during disposal from async Zookeeper calls.

This closes #5256.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b32b8359
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b32b8359
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b32b8359

Branch: refs/heads/master
Commit: b32b8359ea20812cddbcbffc3b617d1256889cb1
Parents: b5de38c
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Mon Jan 8 11:31:57 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Tue Jan 9 15:17:34 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/state/SharedStateRegistry.java      | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b32b8359/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index 458c695..664631b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 
 /**
  * This registry manages state that is shared across (incremental) 
checkpoints, and is responsible
@@ -194,8 +195,17 @@ public class SharedStateRegistry implements AutoCloseable {
                // We do the small optimization to not issue discards for 
placeholders, which are NOPs.
                if (streamStateHandle != null && 
!isPlaceholder(streamStateHandle)) {
                        LOG.trace("Scheduled delete of state handle {}.", 
streamStateHandle);
-                       asyncDisposalExecutor.execute(
-                               new 
SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
+                       AsyncDisposalRunnable asyncDisposalRunnable = new 
AsyncDisposalRunnable(streamStateHandle);
+                       try {
+                               
asyncDisposalExecutor.execute(asyncDisposalRunnable);
+                       } catch (RejectedExecutionException ex) {
+                               // TODO This is a temporary fix for a problem 
during ZooKeeperCompletedCheckpointStore#shutdown:
+                               // Disposal is issued in another async thread 
and the shutdown proceeds to close the I/O Executor pool.
+                               // This leads to RejectedExecutionException 
once the async deletes are triggered by ZK. We need to
+                               // wait for all pending ZK deletes before 
closing the I/O Executor pool. We can simply call #run()
+                               // because we are already in the async ZK 
thread that disposes the handles.
+                               asyncDisposalRunnable.run();
+                       }
                }
        }
 

Reply via email to