This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fdc5ed546360fe438f81c564265f8d623f88f02e
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Dec 10 12:19:59 2019 +0100

    [FLINK-15177][state-backend-rocksdb] Migrate RocksDB Configurable Options 
to new type safe config options
    
    This also simplifies the validation logic.
---
 .../state/DefaultConfigurableOptionsFactory.java   | 94 +++++++++-------------
 .../state/RocksDBConfigurableOptions.java          | 36 ++++++---
 .../state/RocksDBStateBackendConfigTest.java       | 24 +++---
 3 files changed, 75 insertions(+), 79 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
index 12c0eb9..93753b2 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
@@ -32,12 +32,11 @@ import org.rocksdb.TableFormatConfig;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.BLOCK_CACHE_SIZE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.BLOCK_SIZE;
@@ -57,6 +56,8 @@ import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOption
  */
 public class DefaultConfigurableOptionsFactory implements 
ConfigurableOptionsFactory {
 
+       private static final long serialVersionUID = 1L;
+
        private final Map<String, String> configuredOptions = new HashMap<>();
 
        @Override
@@ -131,7 +132,7 @@ public class DefaultConfigurableOptionsFactory implements 
ConfigurableOptionsFac
                return new HashMap<>(configuredOptions);
        }
 
-       private boolean isOptionConfigured(ConfigOption configOption) {
+       private boolean isOptionConfigured(ConfigOption<?> configOption) {
                return configuredOptions.containsKey(configOption.key());
        }
 
@@ -300,44 +301,37 @@ public class DefaultConfigurableOptionsFactory implements 
ConfigurableOptionsFac
                return this;
        }
 
-       private static final String[] CANDIDATE_CONFIGS = new String[]{
+       private static final ConfigOption<?>[] CANDIDATE_CONFIGS = new 
ConfigOption<?>[] {
                // configurable DBOptions
-               MAX_BACKGROUND_THREADS.key(),
-               MAX_OPEN_FILES.key(),
+               MAX_BACKGROUND_THREADS,
+               MAX_OPEN_FILES,
 
                // configurable ColumnFamilyOptions
-               COMPACTION_STYLE.key(),
-               USE_DYNAMIC_LEVEL_SIZE.key(),
-               TARGET_FILE_SIZE_BASE.key(),
-               MAX_SIZE_LEVEL_BASE.key(),
-               WRITE_BUFFER_SIZE.key(),
-               MAX_WRITE_BUFFER_NUMBER.key(),
-               MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key(),
-               BLOCK_SIZE.key(),
-               BLOCK_CACHE_SIZE.key()
+               COMPACTION_STYLE,
+               USE_DYNAMIC_LEVEL_SIZE,
+               TARGET_FILE_SIZE_BASE,
+               MAX_SIZE_LEVEL_BASE,
+               WRITE_BUFFER_SIZE,
+               MAX_WRITE_BUFFER_NUMBER,
+               MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
+               BLOCK_SIZE,
+               BLOCK_CACHE_SIZE
        };
 
-       private static final Set<String> POSITIVE_INT_CONFIG_SET = new 
HashSet<>(Arrays.asList(
-               MAX_BACKGROUND_THREADS.key(),
-               MAX_WRITE_BUFFER_NUMBER.key(),
-               MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key()
-       ));
-
-       private static final Set<String> SIZE_CONFIG_SET = new 
HashSet<>(Arrays.asList(
-               TARGET_FILE_SIZE_BASE.key(),
-               MAX_SIZE_LEVEL_BASE.key(),
-               WRITE_BUFFER_SIZE.key(),
-               BLOCK_SIZE.key(),
-               BLOCK_CACHE_SIZE.key()
+       private static final Set<ConfigOption<?>> POSITIVE_INT_CONFIG_SET = new 
HashSet<>(Arrays.asList(
+               MAX_BACKGROUND_THREADS,
+               MAX_WRITE_BUFFER_NUMBER,
+               MIN_WRITE_BUFFER_NUMBER_TO_MERGE
        ));
 
-       private static final Set<String> BOOLEAN_CONFIG_SET = new 
HashSet<>(Collections.singletonList(
-               USE_DYNAMIC_LEVEL_SIZE.key()
+       private static final Set<ConfigOption<?>> SIZE_CONFIG_SET = new 
HashSet<>(Arrays.asList(
+               TARGET_FILE_SIZE_BASE,
+               MAX_SIZE_LEVEL_BASE,
+               WRITE_BUFFER_SIZE,
+               BLOCK_SIZE,
+               BLOCK_CACHE_SIZE
        ));
 
-       private static final Set<String> COMPACTION_STYLE_SET = 
Arrays.stream(CompactionStyle.values())
-               .map(c -> c.name().toLowerCase()).collect(Collectors.toSet());
-
        /**
         * Creates a {@link DefaultConfigurableOptionsFactory} instance from a 
{@link Configuration}.
         *
@@ -349,13 +343,12 @@ public class DefaultConfigurableOptionsFactory implements 
ConfigurableOptionsFac
         */
        @Override
        public DefaultConfigurableOptionsFactory configure(Configuration 
configuration) {
-               for (String key : CANDIDATE_CONFIGS) {
-                       String newValue = configuration.getString(key, null);
+               for (ConfigOption<?> option : CANDIDATE_CONFIGS) {
+                       Optional<?> newValue = 
configuration.getOptional(option);
 
-                       if (newValue != null) {
-                               if (checkArgumentValid(key, newValue)) {
-                                       this.configuredOptions.put(key, 
newValue);
-                               }
+                       if (newValue.isPresent()) {
+                               checkArgumentValid(option, newValue.get());
+                               this.configuredOptions.put(option.key(), 
newValue.get().toString());
                        }
                }
                return this;
@@ -371,30 +364,19 @@ public class DefaultConfigurableOptionsFactory implements 
ConfigurableOptionsFac
        /**
         * Helper method to check whether the (key,value) is valid through 
given configuration and returns the formatted value.
         *
-        * @param key The configuration key which is configurable in {@link 
RocksDBConfigurableOptions}.
+        * @param option The configuration key which is configurable in {@link 
RocksDBConfigurableOptions}.
         * @param value The value within given configuration.
-        *
-        * @return whether the given key and value in string format is legal.
         */
-       private static boolean checkArgumentValid(String key, String value) {
-               if (POSITIVE_INT_CONFIG_SET.contains(key)) {
+       private static void checkArgumentValid(ConfigOption<?> option, Object 
value) {
+               final String key = option.key();
 
-                       Preconditions.checkArgument(Integer.parseInt(value) > 0,
+               if (POSITIVE_INT_CONFIG_SET.contains(option)) {
+                       Preconditions.checkArgument((Integer) value  > 0,
                                "Configured value for key: " + key + " must be 
larger than 0.");
-               } else if (SIZE_CONFIG_SET.contains(key)) {
-
-                       
Preconditions.checkArgument(MemorySize.parseBytes(value) > 0,
+               } else if (SIZE_CONFIG_SET.contains(option)) {
+                       Preconditions.checkArgument(((MemorySize) 
value).getBytes() > 0,
                                "Configured size for key" + key + " must be 
larger than 0.");
-               } else if (BOOLEAN_CONFIG_SET.contains(key)) {
-
-                       
Preconditions.checkArgument("true".equalsIgnoreCase(value) || 
"false".equalsIgnoreCase(value),
-                               "The configured boolean value: " + value + " 
for key: " + key + " is illegal.");
-               } else if (key.equals(COMPACTION_STYLE.key())) {
-                       value = value.toLowerCase();
-                       
Preconditions.checkArgument(COMPACTION_STYLE_SET.contains(value),
-                               "Compression type: " + value + " is not 
recognized with legal types: " + String.join(", ", COMPACTION_STYLE_SET));
                }
-               return true;
        }
 
        /**
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
index a8213bc..7e08f25 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -19,8 +19,11 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.description.Description;
 
+import org.rocksdb.CompactionStyle;
+
 import java.io.Serializable;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
@@ -43,14 +46,16 @@ public class RocksDBConfigurableOptions implements 
Serializable {
        // Provided configurable DBOptions within Flink
        
//--------------------------------------------------------------------------
 
-       public static final ConfigOption<String> MAX_BACKGROUND_THREADS =
+       public static final ConfigOption<Integer> MAX_BACKGROUND_THREADS =
                key("state.backend.rocksdb.thread.num")
+                       .intType()
                        .noDefaultValue()
                        .withDescription("The maximum number of concurrent 
background flush and compaction jobs (per TaskManager). " +
                                "RocksDB has default configuration as '1'.");
 
-       public static final ConfigOption<String> MAX_OPEN_FILES =
+       public static final ConfigOption<Integer> MAX_OPEN_FILES =
                key("state.backend.rocksdb.files.open")
+                       .intType()
                        .noDefaultValue()
                        .withDescription("The maximum number of open files (per 
TaskManager) that can be used by the DB, '-1' means no limit. " +
                                "RocksDB has default configuration as '5000'.");
@@ -59,15 +64,17 @@ public class RocksDBConfigurableOptions implements 
Serializable {
        // Provided configurable ColumnFamilyOptions within Flink
        
//--------------------------------------------------------------------------
 
-       public static final ConfigOption<String> COMPACTION_STYLE =
+       public static final ConfigOption<CompactionStyle> COMPACTION_STYLE =
                key("state.backend.rocksdb.compaction.style")
+                       .enumType(CompactionStyle.class)
                        .noDefaultValue()
                        .withDescription(String.format("The specified 
compaction style for DB. Candidate compaction style is %s, %s or %s, " +
                                        "and RocksDB choose '%s' as default 
style.", LEVEL.name(), FIFO.name(), UNIVERSAL.name(),
                                LEVEL.name()));
 
-       public static final ConfigOption<String> USE_DYNAMIC_LEVEL_SIZE =
+       public static final ConfigOption<Boolean> USE_DYNAMIC_LEVEL_SIZE =
                key("state.backend.rocksdb.compaction.level.use-dynamic-size")
+                       .booleanType()
                        .noDefaultValue()
                        .withDescription(Description.builder().text("If true, 
RocksDB will pick target size of each level dynamically. From an empty DB, ")
                                .text("RocksDB would make last level the base 
level, which means merging L0 data into the last level, ")
@@ -78,44 +85,51 @@ public class RocksDBConfigurableOptions implements 
Serializable {
                                                "RocksDB's doc."))
                                .build());
 
-       public static final ConfigOption<String> TARGET_FILE_SIZE_BASE =
+       public static final ConfigOption<MemorySize> TARGET_FILE_SIZE_BASE =
                
key("state.backend.rocksdb.compaction.level.target-file-size-base")
+                       .memoryType()
                        .noDefaultValue()
                        .withDescription("The target file size for compaction, 
which determines a level-1 file size. " +
                                "RocksDB has default configuration as '2MB'.");
 
-       public static final ConfigOption<String> MAX_SIZE_LEVEL_BASE =
+       public static final ConfigOption<MemorySize> MAX_SIZE_LEVEL_BASE =
                
key("state.backend.rocksdb.compaction.level.max-size-level-base")
+                       .memoryType()
                        .noDefaultValue()
                        .withDescription("The upper-bound of the total size of 
level base files in bytes. " +
                                "RocksDB has default configuration as '10MB'.");
 
-       public static final ConfigOption<String> WRITE_BUFFER_SIZE =
+       public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
                key("state.backend.rocksdb.writebuffer.size")
+                       .memoryType()
                        .noDefaultValue()
                        .withDescription("The amount of data built up in memory 
(backed by an unsorted log on disk) " +
                                "before converting to a sorted on-disk files. 
RocksDB has default writebuffer size as '64MB'.");
 
-       public static final ConfigOption<String> MAX_WRITE_BUFFER_NUMBER =
+       public static final ConfigOption<Integer> MAX_WRITE_BUFFER_NUMBER =
                key("state.backend.rocksdb.writebuffer.count")
+                       .intType()
                        .noDefaultValue()
                        .withDescription("Tne maximum number of write buffers 
that are built up in memory. " +
                                "RocksDB has default configuration as '2'.");
 
-       public static final ConfigOption<String> 
MIN_WRITE_BUFFER_NUMBER_TO_MERGE =
+       public static final ConfigOption<Integer> 
MIN_WRITE_BUFFER_NUMBER_TO_MERGE =
                key("state.backend.rocksdb.writebuffer.number-to-merge")
+                       .intType()
                        .noDefaultValue()
                        .withDescription("The minimum number of write buffers 
that will be merged together before writing to storage. " +
                                "RocksDB has default configuration as '1'.");
 
-       public static final ConfigOption<String> BLOCK_SIZE =
+       public static final ConfigOption<MemorySize> BLOCK_SIZE =
                key("state.backend.rocksdb.block.blocksize")
+                       .memoryType()
                        .noDefaultValue()
                        .withDescription("The approximate size (in bytes) of 
user data packed per block. " +
                                "RocksDB has default blocksize as '4KB'.");
 
-       public static final ConfigOption<String> BLOCK_CACHE_SIZE =
+       public static final ConfigOption<MemorySize> BLOCK_CACHE_SIZE =
                key("state.backend.rocksdb.block.cache-size")
+                       .memoryType()
                        .noDefaultValue()
                        .withDescription("The amount of the cache for data 
blocks in RocksDB. " +
                                "RocksDB has default block-cache size as 
'8MB'.");
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index bd2a34a..6892539 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -483,16 +483,16 @@ public class RocksDBStateBackendConfigTest {
 
                // verify legal configuration
                {
-                       
configuration.setString(RocksDBConfigurableOptions.COMPACTION_STYLE, "level");
-                       
configuration.setString(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, 
"TRUE");
-                       
configuration.setString(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE, "8 
mb");
-                       
configuration.setString(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, 
"128MB");
-                       
configuration.setString(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, "4");
-                       
configuration.setString(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, 
"4");
-                       
configuration.setString(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
 "2");
-                       
configuration.setString(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, "64 MB");
-                       
configuration.setString(RocksDBConfigurableOptions.BLOCK_SIZE, "4 kb");
-                       
configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, "512 mb");
+                       
configuration.setString(RocksDBConfigurableOptions.COMPACTION_STYLE.key(), 
"level");
+                       
configuration.setString(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE.key(),
 "TRUE");
+                       
configuration.setString(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE.key(), 
"8 mb");
+                       
configuration.setString(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE.key(), 
"128MB");
+                       
configuration.setString(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS.key(),
 "4");
+                       
configuration.setString(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER.key(),
 "4");
+                       
configuration.setString(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key(),
 "2");
+                       
configuration.setString(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE.key(), "64 
MB");
+                       
configuration.setString(RocksDBConfigurableOptions.BLOCK_SIZE.key(), "4 kb");
+                       
configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE.key(), "512 
mb");
 
                        DefaultConfigurableOptionsFactory optionsFactory = new 
DefaultConfigurableOptionsFactory();
                        optionsFactory.configure(configuration);
@@ -740,10 +740,10 @@ public class RocksDBStateBackendConfigTest {
        }
 
        private void verifyIllegalArgument(
-                       ConfigOption<String> configOption,
+                       ConfigOption<?> configOption,
                        String configValue) {
                Configuration configuration = new Configuration();
-               configuration.setString(configOption, configValue);
+               configuration.setString(configOption.key(), configValue);
 
                DefaultConfigurableOptionsFactory optionsFactory = new 
DefaultConfigurableOptionsFactory();
                try {

Reply via email to