Repository: flink
Updated Branches:
  refs/heads/release-1.3 d552b3447 -> 353d60045


[FLINK-6685] Prevent that SafetyNetCloseableRegistry is closed prematurely in 
Task::triggerCheckpointBarrier


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1e059b9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1e059b9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1e059b9

Branch: refs/heads/release-1.3
Commit: f1e059b9058a332fc25e9bcd2a1d8ba3dfaf4044
Parents: d552b34
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Tue May 23 15:44:07 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Wed May 24 14:54:12 2017 +0200

----------------------------------------------------------------------
 .../flink/core/fs/FileSystemSafetyNet.java      | 20 ++++++++++++++++++++
 .../apache/flink/runtime/taskmanager/Task.java  |  9 ++++-----
 2 files changed, 24 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1e059b9/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
index eb28504..1391a33 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
@@ -106,6 +106,26 @@ public class FileSystemSafetyNet {
                }
        }
 
+       /**
+        * Returns the active safety-net registry for the current thread.
+        * @deprecated This method should be removed after FLINK-6684 is 
implemented.
+        */
+       @Deprecated
+       @Internal
+       public static SafetyNetCloseableRegistry 
getSafetyNetCloseableRegistryForThread() {
+               return REGISTRIES.get();
+       }
+
+       /**
+        * Sets the active safety-net registry for the current thread.
+        * @deprecated This method should be removed after FLINK-6684 is 
implemented.
+        */
+       @Deprecated
+       @Internal
+       public static void 
setSafetyNetCloseableRegistryForThread(SafetyNetCloseableRegistry registry) {
+               REGISTRIES.set(registry);
+       }
+
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f1e059b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index e626dae..e18628e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystemSafetyNet;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.SafetyNetCloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -71,7 +72,6 @@ import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
-
 import org.apache.flink.util.WrappingRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1172,13 +1172,14 @@ public class Task implements Runnable, TaskActions {
                                // build a local closure
                                final StatefulTask statefulTask = 
(StatefulTask) invokable;
                                final String taskName = taskNameWithSubtask;
-
+                               final SafetyNetCloseableRegistry 
safetyNetCloseableRegistry =
+                                       
FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
                                Runnable runnable = new Runnable() {
                                        @Override
                                        public void run() {
                                                // activate safety net for 
checkpointing thread
                                                LOG.debug("Creating FileSystem 
stream leak safety net for {}", Thread.currentThread().getName());
-                                               
FileSystemSafetyNet.initializeSafetyNetForThread();
+                                               
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
 
                                                try {
                                                        boolean success = 
statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
@@ -1202,8 +1203,6 @@ public class Task implements Runnable, TaskActions {
                                                        // close and 
de-activate safety net for checkpointing thread
                                                        LOG.debug("Ensuring all 
FileSystem streams are closed for {}",
                                                                        
Thread.currentThread().getName());
-
-                                                       
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                                                }
                                        }
                                };

Reply via email to