This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fdc5ed546360fe438f81c564265f8d623f88f02e Author: Stephan Ewen <se...@apache.org> AuthorDate: Tue Dec 10 12:19:59 2019 +0100 [FLINK-15177][state-backend-rocksdb] Migrate RocksDB Configurable Options to new type safe config options This also simplifies the validation logic. --- .../state/DefaultConfigurableOptionsFactory.java | 94 +++++++++------------- .../state/RocksDBConfigurableOptions.java | 36 ++++++--- .../state/RocksDBStateBackendConfigTest.java | 24 +++--- 3 files changed, 75 insertions(+), 79 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java index 12c0eb9..93753b2 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java @@ -32,12 +32,11 @@ import org.rocksdb.TableFormatConfig; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.BLOCK_CACHE_SIZE; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.BLOCK_SIZE; @@ -57,6 +56,8 @@ import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOption */ public class DefaultConfigurableOptionsFactory implements ConfigurableOptionsFactory { + private static final long serialVersionUID = 1L; + private final Map<String, String> configuredOptions = new HashMap<>(); @Override @@ -131,7 +132,7 @@ public class DefaultConfigurableOptionsFactory implements ConfigurableOptionsFac return new HashMap<>(configuredOptions); } - private boolean isOptionConfigured(ConfigOption configOption) { + private boolean isOptionConfigured(ConfigOption<?> configOption) { return configuredOptions.containsKey(configOption.key()); } @@ -300,44 +301,37 @@ public class DefaultConfigurableOptionsFactory implements ConfigurableOptionsFac return this; } - private static final String[] CANDIDATE_CONFIGS = new String[]{ + private static final ConfigOption<?>[] CANDIDATE_CONFIGS = new ConfigOption<?>[] { // configurable DBOptions - MAX_BACKGROUND_THREADS.key(), - MAX_OPEN_FILES.key(), + MAX_BACKGROUND_THREADS, + MAX_OPEN_FILES, // configurable ColumnFamilyOptions - COMPACTION_STYLE.key(), - USE_DYNAMIC_LEVEL_SIZE.key(), - TARGET_FILE_SIZE_BASE.key(), - MAX_SIZE_LEVEL_BASE.key(), - WRITE_BUFFER_SIZE.key(), - MAX_WRITE_BUFFER_NUMBER.key(), - MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key(), - BLOCK_SIZE.key(), - BLOCK_CACHE_SIZE.key() + COMPACTION_STYLE, + USE_DYNAMIC_LEVEL_SIZE, + TARGET_FILE_SIZE_BASE, + MAX_SIZE_LEVEL_BASE, + WRITE_BUFFER_SIZE, + MAX_WRITE_BUFFER_NUMBER, + MIN_WRITE_BUFFER_NUMBER_TO_MERGE, + BLOCK_SIZE, + BLOCK_CACHE_SIZE }; - private static final Set<String> POSITIVE_INT_CONFIG_SET = new HashSet<>(Arrays.asList( - MAX_BACKGROUND_THREADS.key(), - MAX_WRITE_BUFFER_NUMBER.key(), - MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key() - )); - - private static final Set<String> SIZE_CONFIG_SET = new HashSet<>(Arrays.asList( - TARGET_FILE_SIZE_BASE.key(), - MAX_SIZE_LEVEL_BASE.key(), - WRITE_BUFFER_SIZE.key(), - BLOCK_SIZE.key(), - BLOCK_CACHE_SIZE.key() + private static final Set<ConfigOption<?>> POSITIVE_INT_CONFIG_SET = new HashSet<>(Arrays.asList( + MAX_BACKGROUND_THREADS, + MAX_WRITE_BUFFER_NUMBER, + MIN_WRITE_BUFFER_NUMBER_TO_MERGE )); - private static final Set<String> BOOLEAN_CONFIG_SET = new HashSet<>(Collections.singletonList( - USE_DYNAMIC_LEVEL_SIZE.key() + private static final Set<ConfigOption<?>> SIZE_CONFIG_SET = new HashSet<>(Arrays.asList( + TARGET_FILE_SIZE_BASE, + MAX_SIZE_LEVEL_BASE, + WRITE_BUFFER_SIZE, + BLOCK_SIZE, + BLOCK_CACHE_SIZE )); - private static final Set<String> COMPACTION_STYLE_SET = Arrays.stream(CompactionStyle.values()) - .map(c -> c.name().toLowerCase()).collect(Collectors.toSet()); - /** * Creates a {@link DefaultConfigurableOptionsFactory} instance from a {@link Configuration}. * @@ -349,13 +343,12 @@ public class DefaultConfigurableOptionsFactory implements ConfigurableOptionsFac */ @Override public DefaultConfigurableOptionsFactory configure(Configuration configuration) { - for (String key : CANDIDATE_CONFIGS) { - String newValue = configuration.getString(key, null); + for (ConfigOption<?> option : CANDIDATE_CONFIGS) { + Optional<?> newValue = configuration.getOptional(option); - if (newValue != null) { - if (checkArgumentValid(key, newValue)) { - this.configuredOptions.put(key, newValue); - } + if (newValue.isPresent()) { + checkArgumentValid(option, newValue.get()); + this.configuredOptions.put(option.key(), newValue.get().toString()); } } return this; @@ -371,30 +364,19 @@ public class DefaultConfigurableOptionsFactory implements ConfigurableOptionsFac /** * Helper method to check whether the (key,value) is valid through given configuration and returns the formatted value. * - * @param key The configuration key which is configurable in {@link RocksDBConfigurableOptions}. + * @param option The configuration key which is configurable in {@link RocksDBConfigurableOptions}. * @param value The value within given configuration. - * - * @return whether the given key and value in string format is legal. */ - private static boolean checkArgumentValid(String key, String value) { - if (POSITIVE_INT_CONFIG_SET.contains(key)) { + private static void checkArgumentValid(ConfigOption<?> option, Object value) { + final String key = option.key(); - Preconditions.checkArgument(Integer.parseInt(value) > 0, + if (POSITIVE_INT_CONFIG_SET.contains(option)) { + Preconditions.checkArgument((Integer) value > 0, "Configured value for key: " + key + " must be larger than 0."); - } else if (SIZE_CONFIG_SET.contains(key)) { - - Preconditions.checkArgument(MemorySize.parseBytes(value) > 0, + } else if (SIZE_CONFIG_SET.contains(option)) { + Preconditions.checkArgument(((MemorySize) value).getBytes() > 0, "Configured size for key" + key + " must be larger than 0."); - } else if (BOOLEAN_CONFIG_SET.contains(key)) { - - Preconditions.checkArgument("true".equalsIgnoreCase(value) || "false".equalsIgnoreCase(value), - "The configured boolean value: " + value + " for key: " + key + " is illegal."); - } else if (key.equals(COMPACTION_STYLE.key())) { - value = value.toLowerCase(); - Preconditions.checkArgument(COMPACTION_STYLE_SET.contains(value), - "Compression type: " + value + " is not recognized with legal types: " + String.join(", ", COMPACTION_STYLE_SET)); } - return true; } /** diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java index a8213bc..7e08f25 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java @@ -19,8 +19,11 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.description.Description; +import org.rocksdb.CompactionStyle; + import java.io.Serializable; import static org.apache.flink.configuration.ConfigOptions.key; @@ -43,14 +46,16 @@ public class RocksDBConfigurableOptions implements Serializable { // Provided configurable DBOptions within Flink //-------------------------------------------------------------------------- - public static final ConfigOption<String> MAX_BACKGROUND_THREADS = + public static final ConfigOption<Integer> MAX_BACKGROUND_THREADS = key("state.backend.rocksdb.thread.num") + .intType() .noDefaultValue() .withDescription("The maximum number of concurrent background flush and compaction jobs (per TaskManager). " + "RocksDB has default configuration as '1'."); - public static final ConfigOption<String> MAX_OPEN_FILES = + public static final ConfigOption<Integer> MAX_OPEN_FILES = key("state.backend.rocksdb.files.open") + .intType() .noDefaultValue() .withDescription("The maximum number of open files (per TaskManager) that can be used by the DB, '-1' means no limit. " + "RocksDB has default configuration as '5000'."); @@ -59,15 +64,17 @@ public class RocksDBConfigurableOptions implements Serializable { // Provided configurable ColumnFamilyOptions within Flink //-------------------------------------------------------------------------- - public static final ConfigOption<String> COMPACTION_STYLE = + public static final ConfigOption<CompactionStyle> COMPACTION_STYLE = key("state.backend.rocksdb.compaction.style") + .enumType(CompactionStyle.class) .noDefaultValue() .withDescription(String.format("The specified compaction style for DB. Candidate compaction style is %s, %s or %s, " + "and RocksDB choose '%s' as default style.", LEVEL.name(), FIFO.name(), UNIVERSAL.name(), LEVEL.name())); - public static final ConfigOption<String> USE_DYNAMIC_LEVEL_SIZE = + public static final ConfigOption<Boolean> USE_DYNAMIC_LEVEL_SIZE = key("state.backend.rocksdb.compaction.level.use-dynamic-size") + .booleanType() .noDefaultValue() .withDescription(Description.builder().text("If true, RocksDB will pick target size of each level dynamically. From an empty DB, ") .text("RocksDB would make last level the base level, which means merging L0 data into the last level, ") @@ -78,44 +85,51 @@ public class RocksDBConfigurableOptions implements Serializable { "RocksDB's doc.")) .build()); - public static final ConfigOption<String> TARGET_FILE_SIZE_BASE = + public static final ConfigOption<MemorySize> TARGET_FILE_SIZE_BASE = key("state.backend.rocksdb.compaction.level.target-file-size-base") + .memoryType() .noDefaultValue() .withDescription("The target file size for compaction, which determines a level-1 file size. " + "RocksDB has default configuration as '2MB'."); - public static final ConfigOption<String> MAX_SIZE_LEVEL_BASE = + public static final ConfigOption<MemorySize> MAX_SIZE_LEVEL_BASE = key("state.backend.rocksdb.compaction.level.max-size-level-base") + .memoryType() .noDefaultValue() .withDescription("The upper-bound of the total size of level base files in bytes. " + "RocksDB has default configuration as '10MB'."); - public static final ConfigOption<String> WRITE_BUFFER_SIZE = + public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE = key("state.backend.rocksdb.writebuffer.size") + .memoryType() .noDefaultValue() .withDescription("The amount of data built up in memory (backed by an unsorted log on disk) " + "before converting to a sorted on-disk files. RocksDB has default writebuffer size as '64MB'."); - public static final ConfigOption<String> MAX_WRITE_BUFFER_NUMBER = + public static final ConfigOption<Integer> MAX_WRITE_BUFFER_NUMBER = key("state.backend.rocksdb.writebuffer.count") + .intType() .noDefaultValue() .withDescription("Tne maximum number of write buffers that are built up in memory. " + "RocksDB has default configuration as '2'."); - public static final ConfigOption<String> MIN_WRITE_BUFFER_NUMBER_TO_MERGE = + public static final ConfigOption<Integer> MIN_WRITE_BUFFER_NUMBER_TO_MERGE = key("state.backend.rocksdb.writebuffer.number-to-merge") + .intType() .noDefaultValue() .withDescription("The minimum number of write buffers that will be merged together before writing to storage. " + "RocksDB has default configuration as '1'."); - public static final ConfigOption<String> BLOCK_SIZE = + public static final ConfigOption<MemorySize> BLOCK_SIZE = key("state.backend.rocksdb.block.blocksize") + .memoryType() .noDefaultValue() .withDescription("The approximate size (in bytes) of user data packed per block. " + "RocksDB has default blocksize as '4KB'."); - public static final ConfigOption<String> BLOCK_CACHE_SIZE = + public static final ConfigOption<MemorySize> BLOCK_CACHE_SIZE = key("state.backend.rocksdb.block.cache-size") + .memoryType() .noDefaultValue() .withDescription("The amount of the cache for data blocks in RocksDB. " + "RocksDB has default block-cache size as '8MB'."); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index bd2a34a..6892539 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -483,16 +483,16 @@ public class RocksDBStateBackendConfigTest { // verify legal configuration { - configuration.setString(RocksDBConfigurableOptions.COMPACTION_STYLE, "level"); - configuration.setString(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, "TRUE"); - configuration.setString(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE, "8 mb"); - configuration.setString(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, "128MB"); - configuration.setString(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, "4"); - configuration.setString(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, "4"); - configuration.setString(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE, "2"); - configuration.setString(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, "64 MB"); - configuration.setString(RocksDBConfigurableOptions.BLOCK_SIZE, "4 kb"); - configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, "512 mb"); + configuration.setString(RocksDBConfigurableOptions.COMPACTION_STYLE.key(), "level"); + configuration.setString(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE.key(), "TRUE"); + configuration.setString(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE.key(), "8 mb"); + configuration.setString(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE.key(), "128MB"); + configuration.setString(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS.key(), "4"); + configuration.setString(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER.key(), "4"); + configuration.setString(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key(), "2"); + configuration.setString(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE.key(), "64 MB"); + configuration.setString(RocksDBConfigurableOptions.BLOCK_SIZE.key(), "4 kb"); + configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE.key(), "512 mb"); DefaultConfigurableOptionsFactory optionsFactory = new DefaultConfigurableOptionsFactory(); optionsFactory.configure(configuration); @@ -740,10 +740,10 @@ public class RocksDBStateBackendConfigTest { } private void verifyIllegalArgument( - ConfigOption<String> configOption, + ConfigOption<?> configOption, String configValue) { Configuration configuration = new Configuration(); - configuration.setString(configOption, configValue); + configuration.setString(configOption.key(), configValue); DefaultConfigurableOptionsFactory optionsFactory = new DefaultConfigurableOptionsFactory(); try {