This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8be1058  [FLINK-23003][runtime] Fix resource leak in 
RocksIncrementalSnapshotStrategy
8be1058 is described below

commit 8be1058a60565587b465a2237136dbbbb4c168f3
Author: leiyanfei <[email protected]>
AuthorDate: Fri Jun 11 17:20:48 2021 +0800

    [FLINK-23003][runtime] Fix resource leak in RocksIncrementalSnapshotStrategy
---
 .../flink/contrib/streaming/state/RocksDBKeyedStateBackend.java      | 1 +
 .../contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java     | 3 ++-
 .../streaming/state/snapshot/RocksDBSnapshotStrategyBase.java        | 5 ++++-
 .../contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java  | 5 +++++
 .../streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java   | 5 +++++
 5 files changed, 17 insertions(+), 2 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 499493b..5ec7f41 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -465,6 +465,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
             cleanInstanceBasePath();
         }
+        IOUtils.closeQuietly(checkpointSnapshotStrategy);
         this.disposed = true;
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 5eb0be0..246677d 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -265,7 +265,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                 new RocksDbTtlCompactFiltersManager(ttlTimeProvider);
 
         ResourceGuard rocksDBResourceGuard = new ResourceGuard();
-        RocksDBSnapshotStrategyBase<K, ?> checkpointStrategy;
+        RocksDBSnapshotStrategyBase<K, ?> checkpointStrategy = null;
         PriorityQueueSetFactory priorityQueueFactory;
         SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
         // Number of bytes required to prefix the key groups.
@@ -363,6 +363,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
             IOUtils.closeQuietly(optionsContainer);
             
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
             kvStateInformation.clear();
+            IOUtils.closeQuietly(checkpointStrategy);
             try {
                 FileUtils.deleteDirectory(instanceBasePath);
             } catch (Exception ex) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
index 74311ef..b064fc5 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
@@ -43,7 +43,7 @@ import java.util.LinkedHashMap;
  * @param <K> type of the backend keys.
  */
 public abstract class RocksDBSnapshotStrategyBase<K, R extends 
SnapshotResources>
-        implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R> {
+        implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R>, 
AutoCloseable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBSnapshotStrategyBase.class);
 
@@ -92,4 +92,7 @@ public abstract class RocksDBSnapshotStrategyBase<K, R 
extends SnapshotResources
     public String getDescription() {
         return description;
     }
+
+    @Override
+    public abstract void close();
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
index 8812165..8e3063a 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
@@ -141,6 +141,11 @@ public class RocksFullSnapshotStrategy<K>
         // nothing to do.
     }
 
+    @Override
+    public void close() {
+        // nothing to do.
+    }
+
     private SupplierWithException<CheckpointStreamWithResultProvider, 
Exception>
             createCheckpointStreamSupplier(
                     long checkpointId,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 682a3f7..99f0b6f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -204,6 +204,11 @@ public class RocksIncrementalSnapshotStrategy<K>
         }
     }
 
+    @Override
+    public void close() {
+        stateUploader.close();
+    }
+
     @Nonnull
     private SnapshotDirectory prepareLocalSnapshotDirectory(long checkpointId) 
throws IOException {
 

Reply via email to