Repository: flink Updated Branches: refs/heads/release-1.3 d552b3447 -> 353d60045
[FLINK-6685] Prevent that SafetyNetCloseableRegistry is closed prematurely in Task::triggerCheckpointBarrier Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1e059b9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1e059b9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1e059b9 Branch: refs/heads/release-1.3 Commit: f1e059b9058a332fc25e9bcd2a1d8ba3dfaf4044 Parents: d552b34 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Tue May 23 15:44:07 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Wed May 24 14:54:12 2017 +0200 ---------------------------------------------------------------------- .../flink/core/fs/FileSystemSafetyNet.java | 20 ++++++++++++++++++++ .../apache/flink/runtime/taskmanager/Task.java | 9 ++++----- 2 files changed, 24 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f1e059b9/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java index eb28504..1391a33 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java @@ -106,6 +106,26 @@ public class FileSystemSafetyNet { } } + /** + * Returns the active safety-net registry for the current thread. + * @deprecated This method should be removed after FLINK-6684 is implemented. + */ + @Deprecated + @Internal + public static SafetyNetCloseableRegistry getSafetyNetCloseableRegistryForThread() { + return REGISTRIES.get(); + } + + /** + * Sets the active safety-net registry for the current thread. + * @deprecated This method should be removed after FLINK-6684 is implemented. + */ + @Deprecated + @Internal + public static void setSafetyNetCloseableRegistryForThread(SafetyNetCloseableRegistry registry) { + REGISTRIES.set(registry); + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/f1e059b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index e626dae..e18628e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.FileSystemSafetyNet; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.SafetyNetCloseableRegistry; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.blob.BlobKey; @@ -71,7 +72,6 @@ import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; - import org.apache.flink.util.WrappingRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1172,13 +1172,14 @@ public class Task implements Runnable, TaskActions { // build a local closure final StatefulTask statefulTask = (StatefulTask) invokable; final String taskName = taskNameWithSubtask; - + final SafetyNetCloseableRegistry safetyNetCloseableRegistry = + FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread(); Runnable runnable = new Runnable() { @Override public void run() { // activate safety net for checkpointing thread LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName()); - FileSystemSafetyNet.initializeSafetyNetForThread(); + FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); try { boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions); @@ -1202,8 +1203,6 @@ public class Task implements Runnable, TaskActions { // close and de-activate safety net for checkpointing thread LOG.debug("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName()); - - FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); } } };