This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 31dd8c36246c86db3fb07790158a8c9fe373b726
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Dec 10 11:17:47 2019 +0100

    [FLINK-14926][state-backend-rocksdb] (follow-up) Avoid exposing handles 
that will not be closed from RocksDBStateBackend
    
    That means that RocksDBStateBackend should not have accessors to gather 
created native handles.
    The native handles should only be created once the Resource Container is 
created.
    This implies removing tome test methods from RocksDBStateBackend and 
changing the tests to test against the
    RocksDBResourceContainer instead.
---
 .../streaming/state/RocksDBResourceContainer.java  |  4 ++
 .../streaming/state/RocksDBStateBackend.java       | 48 +++++++-----------
 .../state/RocksDBStateBackendConfigTest.java       | 59 +++++++++-------------
 3 files changed, 46 insertions(+), 65 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 20c3c5c..199cd84 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -112,6 +112,10 @@ final class RocksDBResourceContainer implements 
AutoCloseable {
                                : 
optionsFactory.createNativeMetricsOptions(defaultMetricOptions);
        }
 
+       PredefinedOptions getPredefinedOptions() {
+               return predefinedOptions;
+       }
+
        @Nullable
        OptionsFactory getOptionsFactory() {
                return optionsFactory;
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 3ffb021..b572deb 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -131,9 +131,11 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
        private File[] localRocksDbDirectories;
 
        /** The pre-configured option settings. */
+       @Nullable
        private PredefinedOptions predefinedOptions;
 
        /** The options factory to create the RocksDB options in the cluster. */
+       @Nullable
        private OptionsFactory optionsFactory;
 
        /** This determines if incremental checkpointing is enabled. */
@@ -505,8 +507,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                final OpaqueMemoryResource<RocksDBSharedResources> 
sharedResources = RocksDBOperationUtils
                                
.allocateSharedCachesIfConfigured(memoryConfiguration, env.getMemoryManager(), 
LOG);
 
-               final RocksDBResourceContainer resourceContainer = new 
RocksDBResourceContainer(
-                               getConfiguredPredefinedOptionsOrDefault(), 
optionsFactory, sharedResources);
+               final RocksDBResourceContainer resourceContainer = 
createOptionsAndResourceContainer(sharedResources);
 
                final DBOptions dbOptions = resourceContainer.getDbOptions();
                final Function<String, ColumnFamilyOptions> createColumnOptions;
@@ -826,34 +827,12 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
         *
         * @return The options factory.
         */
-       @VisibleForTesting
+       @Nullable
        public OptionsFactory getOptions() {
                return optionsFactory;
        }
 
        /**
-        * Gets the RocksDB {@link DBOptions} to be used for all RocksDB 
instances. Only for testing.
-        */
-       @VisibleForTesting
-       public DBOptions getDbOptions() {
-               RocksDBResourceContainer resourceContainer = new 
RocksDBResourceContainer(
-                               getConfiguredPredefinedOptionsOrDefault(),
-                               optionsFactory);
-               return resourceContainer.getDbOptions();
-       }
-
-       /**
-        * Gets the RocksDB {@link ColumnFamilyOptions} to be used for all 
RocksDB instances. Only for testing.
-        */
-       @VisibleForTesting
-       public ColumnFamilyOptions getColumnOptions() {
-               RocksDBResourceContainer resourceContainer = new 
RocksDBResourceContainer(
-                       getConfiguredPredefinedOptionsOrDefault(),
-                       optionsFactory);
-               return resourceContainer.getColumnOptions();
-       }
-
-       /**
         * Gets the number of threads used to transfer files while 
snapshotting/restoring.
         */
        public int getNumberOfTransferThreads() {
@@ -888,14 +867,25 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                setNumberOfTransferThreads(numberOfTransferingThreads);
        }
 
-       private PredefinedOptions getConfiguredPredefinedOptionsOrDefault() {
-               return predefinedOptions != null ? predefinedOptions : 
PredefinedOptions.DEFAULT;
-       }
-
        // 
------------------------------------------------------------------------
        //  utilities
        // 
------------------------------------------------------------------------
 
+       @VisibleForTesting
+       RocksDBResourceContainer createOptionsAndResourceContainer() {
+               return createOptionsAndResourceContainer(null);
+       }
+
+       @VisibleForTesting
+       private RocksDBResourceContainer createOptionsAndResourceContainer(
+               @Nullable OpaqueMemoryResource<RocksDBSharedResources> 
sharedResources) {
+
+               return new RocksDBResourceContainer(
+                       predefinedOptions != null ? predefinedOptions : 
PredefinedOptions.DEFAULT,
+                       optionsFactory,
+                       sharedResources);
+       }
+
        @Override
        public String toString() {
                return "RocksDBStateBackend{" +
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 6892539..a8030a1 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -57,7 +57,6 @@ import org.rocksdb.DBOptions;
 import org.rocksdb.util.SizeUnit;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -412,19 +411,10 @@ public class RocksDBStateBackendConfigTest {
                // verify that predefined options could be set programmatically 
and override pre-configured one.
                
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
                assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, 
rocksDbBackend.getPredefinedOptions());
-
-               try (ColumnFamilyOptions colCreated = 
rocksDbBackend.getColumnOptions()) {
-                       assertEquals(CompactionStyle.LEVEL, 
colCreated.compactionStyle());
-               }
        }
 
        @Test
        public void testSetConfigurableOptions() throws Exception  {
-               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
-               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
-
-               assertNull(rocksDbBackend.getOptions());
-
                DefaultConfigurableOptionsFactory customizedOptions = new 
DefaultConfigurableOptionsFactory()
                        .setMaxBackgroundThreads(4)
                        .setMaxOpenFiles(-1)
@@ -438,13 +428,13 @@ public class RocksDBStateBackendConfigTest {
                        .setBlockSize("64KB")
                        .setBlockCacheSize("512mb");
 
-               rocksDbBackend.setOptions(customizedOptions);
+               try (RocksDBResourceContainer optionsContainer =
+                               new 
RocksDBResourceContainer(PredefinedOptions.DEFAULT, customizedOptions)) {
 
-               try (DBOptions dbOptions = rocksDbBackend.getDbOptions()) {
+                       DBOptions dbOptions = optionsContainer.getDbOptions();
                        assertEquals(-1, dbOptions.maxOpenFiles());
-               }
 
-               try (ColumnFamilyOptions columnOptions = 
rocksDbBackend.getColumnOptions()) {
+                       ColumnFamilyOptions columnOptions = 
optionsContainer.getColumnOptions();
                        assertEquals(CompactionStyle.LEVEL, 
columnOptions.compactionStyle());
                        
assertTrue(columnOptions.levelCompactionDynamicLevelBytes());
                        assertEquals(4 * SizeUnit.MB, 
columnOptions.targetFileSizeBase());
@@ -459,7 +449,7 @@ public class RocksDBStateBackendConfigTest {
        }
 
        @Test
-       public void testConfigurableOptionsFromConfig() throws IOException {
+       public void testConfigurableOptionsFromConfig() throws Exception {
                Configuration configuration = new Configuration();
                DefaultConfigurableOptionsFactory defaultOptionsFactory = new 
DefaultConfigurableOptionsFactory();
                
assertTrue(defaultOptionsFactory.configure(configuration).getConfiguredOptions().isEmpty());
@@ -496,15 +486,14 @@ public class RocksDBStateBackendConfigTest {
 
                        DefaultConfigurableOptionsFactory optionsFactory = new 
DefaultConfigurableOptionsFactory();
                        optionsFactory.configure(configuration);
-                       String checkpointPath = 
tempFolder.newFolder().toURI().toString();
-                       RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
-                       rocksDbBackend.setOptions(optionsFactory);
 
-                       try (DBOptions dbOptions = 
rocksDbBackend.getDbOptions()) {
+                       try (RocksDBResourceContainer optionsContainer =
+                                       new 
RocksDBResourceContainer(PredefinedOptions.DEFAULT, optionsFactory)) {
+
+                               DBOptions dbOptions = 
optionsContainer.getDbOptions();
                                assertEquals(-1, dbOptions.maxOpenFiles());
-                       }
 
-                       try (ColumnFamilyOptions columnOptions = 
rocksDbBackend.getColumnOptions()) {
+                               ColumnFamilyOptions columnOptions = 
optionsContainer.getColumnOptions();
                                assertEquals(CompactionStyle.LEVEL, 
columnOptions.compactionStyle());
                                
assertTrue(columnOptions.levelCompactionDynamicLevelBytes());
                                assertEquals(8 * SizeUnit.MB, 
columnOptions.targetFileSizeBase());
@@ -533,7 +522,9 @@ public class RocksDBStateBackendConfigTest {
                rocksDbBackend = rocksDbBackend.configure(config, 
getClass().getClassLoader());
 
                assertTrue(rocksDbBackend.getOptions() instanceof 
TestOptionsFactory);
-               try (DBOptions dbOptions = rocksDbBackend.getDbOptions()) {
+
+               try (RocksDBResourceContainer optionsContainer = 
rocksDbBackend.createOptionsAndResourceContainer()) {
+                       DBOptions dbOptions = optionsContainer.getDbOptions();
                        assertEquals(4, dbOptions.maxBackgroundJobs());
                }
 
@@ -550,21 +541,15 @@ public class RocksDBStateBackendConfigTest {
                        }
                });
 
-               assertNotNull(rocksDbBackend.getOptions());
-               try (ColumnFamilyOptions colCreated = 
rocksDbBackend.getColumnOptions()) {
+               try (RocksDBResourceContainer optionsContainer = 
rocksDbBackend.createOptionsAndResourceContainer()) {
+                       ColumnFamilyOptions colCreated = 
optionsContainer.getColumnOptions();
                        assertEquals(CompactionStyle.FIFO, 
colCreated.compactionStyle());
                }
        }
 
        @Test
        public void testPredefinedAndOptionsFactory() throws Exception {
-               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
-               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
-
-               assertEquals(PredefinedOptions.DEFAULT, 
rocksDbBackend.getPredefinedOptions());
-
-               
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
-               rocksDbBackend.setOptions(new OptionsFactory() {
+               final OptionsFactory optionsFactory = new OptionsFactory() {
                        @Override
                        public DBOptions createDBOptions(DBOptions 
currentOptions, Collection<AutoCloseable> handlesToClose) {
                                return currentOptions;
@@ -574,12 +559,14 @@ public class RocksDBStateBackendConfigTest {
                        public ColumnFamilyOptions 
createColumnOptions(ColumnFamilyOptions currentOptions, 
Collection<AutoCloseable> handlesToClose) {
                                return 
currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
                        }
-               });
+               };
 
-               assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, 
rocksDbBackend.getPredefinedOptions());
-               assertNotNull(rocksDbBackend.getOptions());
-               try (ColumnFamilyOptions colCreated = 
rocksDbBackend.getColumnOptions()) {
-                       assertEquals(CompactionStyle.UNIVERSAL, 
colCreated.compactionStyle());
+               try (final RocksDBResourceContainer optionsContainer = new 
RocksDBResourceContainer(
+                               PredefinedOptions.SPINNING_DISK_OPTIMIZED, 
optionsFactory)) {
+
+                       final ColumnFamilyOptions columnFamilyOptions = 
optionsContainer.getColumnOptions();
+                       assertNotNull(columnFamilyOptions);
+                       assertEquals(CompactionStyle.UNIVERSAL, 
columnFamilyOptions.compactionStyle());
                }
        }
 

Reply via email to