This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5326730225e2e0dc25a964a538fc4a350e3d0e18
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Dec 9 19:42:00 2019 +0100

    [FLINK-14926][state-backend-rocksdb] (follow-up) Make 
RocksDBResourceContainer immutable
---
 .../streaming/state/RocksDBResourceContainer.java  | 50 +++++++++++-----------
 .../streaming/state/RocksDBStateBackend.java       | 31 +++++++-------
 .../state/RocksDBResourceContainerTest.java        |  7 ++-
 3 files changed, 43 insertions(+), 45 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 4cb18d8..32c198f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -24,11 +24,12 @@ import org.apache.flink.util.IOUtils;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The container for RocksDB resources, including predefined options, option 
factory and
  * shared resource among instances.
@@ -39,22 +40,37 @@ import java.util.ArrayList;
 final class RocksDBResourceContainer implements AutoCloseable {
 
        /** The pre-configured option settings. */
-       private PredefinedOptions predefinedOptions;
+       private final PredefinedOptions predefinedOptions;
 
        /** The options factory to create the RocksDB options. */
        @Nullable
-       private OptionsFactory optionsFactory;
+       private final OptionsFactory optionsFactory;
 
        /** The shared resource among RocksDB instances. This resource is not 
part of the 'handlesToClose',
         * because the handles to close are closed quietly, whereas for this 
one, we want exceptions to be reported. */
        @Nullable
-       private OpaqueMemoryResource<RocksDBSharedResources> sharedResources;
+       private final OpaqueMemoryResource<RocksDBSharedResources> 
sharedResources;
 
        /** The handles to be closed when the container is closed. */
        private final ArrayList<AutoCloseable> handlesToClose;
 
        public RocksDBResourceContainer() {
-               handlesToClose = new ArrayList<>();
+               this(PredefinedOptions.DEFAULT, null, null);
+       }
+
+       public RocksDBResourceContainer(PredefinedOptions predefinedOptions, 
@Nullable OptionsFactory optionsFactory) {
+               this(predefinedOptions, optionsFactory, null);
+       }
+
+       public RocksDBResourceContainer(
+               PredefinedOptions predefinedOptions,
+               @Nullable OptionsFactory optionsFactory,
+               @Nullable OpaqueMemoryResource<RocksDBSharedResources> 
sharedResources) {
+
+               this.predefinedOptions = checkNotNull(predefinedOptions);
+               this.optionsFactory = optionsFactory;
+               this.sharedResources = sharedResources;
+               this.handlesToClose = new ArrayList<>();
        }
 
        /**
@@ -62,7 +78,7 @@ final class RocksDBResourceContainer implements AutoCloseable 
{
         */
        DBOptions getDbOptions() {
                // initial options from pre-defined profile
-               DBOptions opt = 
checkAndGetPredefinedOptions().createDBOptions(handlesToClose);
+               DBOptions opt = 
predefinedOptions.createDBOptions(handlesToClose);
 
                // add user-defined options factory, if specified
                if (optionsFactory != null) {
@@ -80,7 +96,7 @@ final class RocksDBResourceContainer implements AutoCloseable 
{
         */
        ColumnFamilyOptions getColumnOptions() {
                // initial options from pre-defined profile
-               ColumnFamilyOptions opt = 
checkAndGetPredefinedOptions().createColumnOptions(handlesToClose);
+               ColumnFamilyOptions opt = 
predefinedOptions.createColumnOptions(handlesToClose);
 
                // add user-defined options, if specified
                if (optionsFactory != null) {
@@ -90,29 +106,11 @@ final class RocksDBResourceContainer implements 
AutoCloseable {
                return opt;
        }
 
-       PredefinedOptions checkAndGetPredefinedOptions() {
-               if (predefinedOptions == null) {
-                       predefinedOptions = PredefinedOptions.DEFAULT;
-               }
-               return predefinedOptions;
-       }
-
+       @Nullable
        OptionsFactory getOptionsFactory() {
                return optionsFactory;
        }
 
-       void setPredefinedOptions(@Nonnull PredefinedOptions predefinedOptions) 
{
-               this.predefinedOptions = predefinedOptions;
-       }
-
-       void setOptionsFactory(@Nonnull OptionsFactory optionsFactory) {
-               this.optionsFactory = optionsFactory;
-       }
-
-       void setSharedResources(OpaqueMemoryResource<RocksDBSharedResources> 
sharedResources) {
-               this.sharedResources = sharedResources;
-       }
-
        @Override
        public void close() throws Exception {
                handlesToClose.forEach(IOUtils::closeQuietly);
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 974da57..829566d 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -502,20 +502,17 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                LocalRecoveryConfig localRecoveryConfig =
                        env.getTaskStateManager().createLocalRecoveryConfig();
 
-               // create resource container
-               RocksDBResourceContainer resourceContainer = new 
RocksDBResourceContainer();
-               resourceContainer.setPredefinedOptions(this.predefinedOptions);
-               resourceContainer.setOptionsFactory(this.optionsFactory);
-               DBOptions dbOptions = resourceContainer.getDbOptions();
-               Function<String, ColumnFamilyOptions> createColumnOptions;
-
                final OpaqueMemoryResource<RocksDBSharedResources> 
sharedResources = RocksDBOperationUtils
                                
.allocateSharedCachesIfConfigured(memoryConfiguration, env.getMemoryManager(), 
LOG);
 
+               final RocksDBResourceContainer resourceContainer = new 
RocksDBResourceContainer(
+                               getConfiguredPredefinedOptionsOrDefault(), 
optionsFactory, sharedResources);
+
+               final DBOptions dbOptions = resourceContainer.getDbOptions();
+               final Function<String, ColumnFamilyOptions> createColumnOptions;
+
                if (sharedResources != null) {
                        LOG.info("Obtained shared RocksDB cache of size {} 
bytes", sharedResources.getSize());
-                       // register into options container for disposal.
-                       resourceContainer.setSharedResources(sharedResources);
 
                        final RocksDBSharedResources rocksResources = 
sharedResources.getResourceHandle();
                        final Cache blockCache = rocksResources.getCache();
@@ -839,9 +836,9 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
         */
        @VisibleForTesting
        public DBOptions getDbOptions() {
-               RocksDBResourceContainer resourceContainer = new 
RocksDBResourceContainer();
-               resourceContainer.setOptionsFactory(optionsFactory);
-               resourceContainer.setPredefinedOptions(predefinedOptions);
+               RocksDBResourceContainer resourceContainer = new 
RocksDBResourceContainer(
+                               getConfiguredPredefinedOptionsOrDefault(),
+                               optionsFactory);
                return resourceContainer.getDbOptions();
        }
 
@@ -850,9 +847,9 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
         */
        @VisibleForTesting
        public ColumnFamilyOptions getColumnOptions() {
-               RocksDBResourceContainer resourceContainer = new 
RocksDBResourceContainer();
-               resourceContainer.setOptionsFactory(optionsFactory);
-               resourceContainer.setPredefinedOptions(predefinedOptions);
+               RocksDBResourceContainer resourceContainer = new 
RocksDBResourceContainer(
+                       getConfiguredPredefinedOptionsOrDefault(),
+                       optionsFactory);
                return resourceContainer.getColumnOptions();
        }
 
@@ -901,6 +898,10 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                setNumberOfTransferThreads(numberOfTransferingThreads);
        }
 
+       private PredefinedOptions getConfiguredPredefinedOptionsOrDefault() {
+               return predefinedOptions != null ? predefinedOptions : 
PredefinedOptions.DEFAULT;
+       }
+
        // 
------------------------------------------------------------------------
        //  utilities
        // 
------------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
index 5ec3c04..bad29c9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
@@ -100,9 +100,8 @@ public class RocksDBResourceContainerTest {
 
        @Test
        public void testFreeMultipleColumnOptionsWithPredefinedOptions() throws 
Exception {
-               RocksDBResourceContainer container = new 
RocksDBResourceContainer();
                for (PredefinedOptions predefinedOptions: 
PredefinedOptions.values()) {
-                       container.setPredefinedOptions(predefinedOptions);
+                       RocksDBResourceContainer container = new 
RocksDBResourceContainer(predefinedOptions, null);
                        final int optionNumber = 20;
                        ArrayList<ColumnFamilyOptions> columnFamilyOptions = 
new ArrayList<>(optionNumber);
                        for (int i = 0; i < optionNumber; i++) {
@@ -117,14 +116,14 @@ public class RocksDBResourceContainerTest {
 
        @Test
        public void testFreeSharedResourcesAfterClose() throws Exception {
-               RocksDBResourceContainer container = new 
RocksDBResourceContainer();
                LRUCache cache = new LRUCache(1024L);
                WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
                RocksDBSharedResources sharedResources = new 
RocksDBSharedResources(cache, wbm);
                final ThrowingRunnable<Exception> disposer = 
sharedResources::close;
                OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
                        new OpaqueMemoryResource<>(sharedResources, 1024L, 
disposer);
-               container.setSharedResources(opaqueResource);
+
+               RocksDBResourceContainer container = new 
RocksDBResourceContainer(PredefinedOptions.DEFAULT, null, opaqueResource);
 
                container.close();
                assertThat(cache.isOwningHandle(), is(false));

Reply via email to