This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93deb1a709840c2ae8957ea88cd8ffc0929951ad
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Dec 9 20:22:08 2019 +0100

    [FLINK-14926][state-backend-rocksdb] (follow-up) Simplify test to rely on 
RocksDBResourceContainer for cleanup of native handles
---
 .../contrib/streaming/state/RocksDBResourceContainer.java    |  6 ++++++
 .../flink/contrib/streaming/state/RocksDBStateBackend.java   | 12 +-----------
 .../contrib/streaming/state/RocksDBStateBackendTest.java     | 12 ++++--------
 3 files changed, 11 insertions(+), 19 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 32c198f..20c3c5c 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -106,6 +106,12 @@ final class RocksDBResourceContainer implements 
AutoCloseable {
                return opt;
        }
 
+       RocksDBNativeMetricOptions 
getMemoryWatcherOptions(RocksDBNativeMetricOptions defaultMetricOptions) {
+               return optionsFactory == null
+                               ? defaultMetricOptions
+                               : 
optionsFactory.createNativeMetricsOptions(defaultMetricOptions);
+       }
+
        @Nullable
        OptionsFactory getOptionsFactory() {
                return optionsFactory;
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 829566d..3ffb021 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -560,7 +560,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                        
.setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled())
                        
.setEnableTtlCompactionFilter(isTtlCompactionFilterEnabled())
                        
.setNumberOfTransferingThreads(getNumberOfTransferThreads())
-                       
.setNativeMetricOptions(getMemoryWatcherOptions(resourceContainer));
+                       
.setNativeMetricOptions(resourceContainer.getMemoryWatcherOptions(defaultMetricOptions));
                return builder.build();
        }
 
@@ -853,16 +853,6 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                return resourceContainer.getColumnOptions();
        }
 
-       public RocksDBNativeMetricOptions 
getMemoryWatcherOptions(RocksDBResourceContainer resourceContainer) {
-               RocksDBNativeMetricOptions options = this.defaultMetricOptions;
-               OptionsFactory optionsFactory = 
resourceContainer.getOptionsFactory();
-               if (optionsFactory != null) {
-                       options = 
optionsFactory.createNativeMetricsOptions(options);
-               }
-
-               return options;
-       }
-
        /**
         * Gets the number of threads used to transfer files while 
snapshotting/restoring.
         */
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index e59f6e1..b31e190 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -115,14 +115,12 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
        private String dbPath;
        private RocksDB db = null;
        private ColumnFamilyHandle defaultCFHandle = null;
-       private ColumnFamilyOptions columnOptions = null;
-       private RocksDBResourceContainer optionsContainer = null;
-       private ArrayList<AutoCloseable> handlesToClose = new ArrayList<>();
+       private final RocksDBResourceContainer optionsContainer = new 
RocksDBResourceContainer();
 
        public void prepareRocksDB() throws Exception {
                String dbPath = new File(tempFolder.newFolder(), 
DB_INSTANCE_DIR_STRING).getAbsolutePath();
-               columnOptions = 
PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose);
-               optionsContainer = new RocksDBResourceContainer();
+               ColumnFamilyOptions columnOptions = 
optionsContainer.getColumnOptions();
+
                ArrayList<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>(1);
                db = RocksDBOperationUtils.openDB(dbPath, 
Collections.emptyList(),
                        columnFamilyHandles, columnOptions, 
optionsContainer.getDbOptions());
@@ -157,9 +155,7 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                }
                IOUtils.closeQuietly(defaultCFHandle);
                IOUtils.closeQuietly(db);
-               IOUtils.closeQuietly(columnOptions);
                IOUtils.closeQuietly(optionsContainer);
-               handlesToClose.forEach(IOUtils::closeQuietly);
 
                if (allCreatedCloseables != null) {
                        for (RocksObject rocksCloseable : allCreatedCloseables) 
{
@@ -185,7 +181,7 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                                IntSerializer.INSTANCE,
                                spy(db),
                                defaultCFHandle,
-                               
PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose))
+                               optionsContainer.getColumnOptions())
                        
.setEnableIncrementalCheckpointing(enableIncrementalCheckpointing)
                        .build();
 

Reply via email to