[FLINK-7220] [checkpoints] Update RocksDB dependency to 5.5.5
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d818fc48 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d818fc48 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d818fc48 Branch: refs/heads/master Commit: d818fc48fbc9bfbd613aecdaa0cefcf2c6622289 Parents: e975140 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Mon May 29 11:11:41 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Fri Jul 28 13:57:58 2017 +0200 ---------------------------------------------------------------------- .../flink-statebackend-rocksdb/pom.xml | 6 +-- .../streaming/state/PredefinedOptions.java | 20 ++++---- .../state/RocksDBKeyedStateBackend.java | 51 ++++++++++++++------ .../state/RocksDBStateBackendConfigTest.java | 11 +---- .../state/benchmark/RocksDBPerformanceTest.java | 9 ++-- 5 files changed, 53 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d818fc48/flink-contrib/flink-statebackend-rocksdb/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml b/flink-contrib/flink-statebackend-rocksdb/pom.xml index 527ca18..fa97e07 100644 --- a/flink-contrib/flink-statebackend-rocksdb/pom.xml +++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml @@ -55,9 +55,9 @@ under the License. </dependency> <dependency> - <groupId>com.data-artisans</groupId> - <artifactId>frocksdbjni</artifactId> - <version>4.11.2-artisans</version> + <groupId>org.rocksdb</groupId> + <artifactId>rocksdbjni</artifactId> + <version>5.5.5</version> </dependency> <!-- test dependencies --> http://git-wip-us.apache.org/repos/asf/flink/blob/d818fc48/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java index f606131..cb47ce4 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java @@ -19,10 +19,10 @@ package org.apache.flink.contrib.streaming.state; import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.BloomFilter; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactionStyle; import org.rocksdb.DBOptions; -import org.rocksdb.StringAppendOperator; /** * The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}. @@ -46,14 +46,13 @@ public enum PredefinedOptions { @Override public DBOptions createDBOptions() { return new DBOptions() - .setUseFsync(false) - .setDisableDataSync(true); + .setUseFsync(false); } @Override public ColumnFamilyOptions createColumnOptions() { return new ColumnFamilyOptions() - .setMergeOperator(new StringAppendOperator()); + .setMergeOperatorName(MERGE_OPERATOR_NAME); } }, @@ -86,14 +85,13 @@ public enum PredefinedOptions { return new DBOptions() .setIncreaseParallelism(4) .setUseFsync(false) - .setDisableDataSync(true) .setMaxOpenFiles(-1); } @Override public ColumnFamilyOptions createColumnOptions() { return new ColumnFamilyOptions() - .setMergeOperator(new StringAppendOperator()) + .setMergeOperatorName(MERGE_OPERATOR_NAME) .setCompactionStyle(CompactionStyle.LEVEL) .setLevelCompactionDynamicLevelBytes(true); } @@ -133,7 +131,6 @@ public enum PredefinedOptions { return new DBOptions() .setIncreaseParallelism(4) .setUseFsync(false) - .setDisableDataSync(true) .setMaxOpenFiles(-1); } @@ -146,7 +143,7 @@ public enum PredefinedOptions { final long writeBufferSize = 64 * 1024 * 1024; return new ColumnFamilyOptions() - .setMergeOperator(new StringAppendOperator()) + .setMergeOperatorName(MERGE_OPERATOR_NAME) .setCompactionStyle(CompactionStyle.LEVEL) .setLevelCompactionDynamicLevelBytes(true) .setTargetFileSizeBase(targetFileSize) @@ -158,6 +155,7 @@ public enum PredefinedOptions { new BlockBasedTableConfig() .setBlockCacheSize(blockCacheSize) .setBlockSize(blockSize) + .setFilter(new BloomFilter()) ); } }, @@ -186,19 +184,21 @@ public enum PredefinedOptions { return new DBOptions() .setIncreaseParallelism(4) .setUseFsync(false) - .setDisableDataSync(true) .setMaxOpenFiles(-1); } @Override public ColumnFamilyOptions createColumnOptions() { return new ColumnFamilyOptions() - .setMergeOperator(new StringAppendOperator()); + .setMergeOperatorName(MERGE_OPERATOR_NAME); } }; // ------------------------------------------------------------------------ + // The name of the merge operator in RocksDB. Do not change except you know exactly what you do. + public static final String MERGE_OPERATOR_NAME = "stringappendtest"; + /** * Creates the {@link DBOptions}for this pre-defined setting. * http://git-wip-us.apache.org/repos/asf/flink/blob/d818fc48/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 9d289b4..83b99ad 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -126,6 +126,10 @@ import java.util.concurrent.RunnableFuture; * streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon * checkpointing. This state backend can store very large state that exceeds memory and spills * to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe. + * + * <p>This class follows the rules for closing/releasing native RocksDB resources as described in + + <a href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families"> + * this document</a>. */ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { @@ -160,6 +164,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { protected RocksDB db; /** + * We are not using the default column family for Flink state ops, but we still need to remember this handle so that + * we can close it properly when the backend is closed. This is required by RocksDB's native memory management. + */ + private ColumnFamilyHandle defaultColumnFamily; + + /** * Information about the k/v states as we create them. This is used to retrieve the * column family that is used for a state and also for sanity checks when restoring. */ @@ -254,30 +264,31 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // and access it in a synchronized block that locks on #dbDisposeLock. if (db != null) { - for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : - kvStateInformation.values()) { - try { - column.f0.close(); - } catch (Exception ex) { - LOG.info("Exception while closing ColumnFamilyHandle object.", ex); - } + // RocksDB's native memory management requires that *all* CFs (including default) are closed before the + // DB is closed. So we start with the ones created by Flink... + for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData : + kvStateInformation.values()) { + + IOUtils.closeQuietly(columnMetaData.f0); } - kvStateInformation.clear(); - restoredKvStateMetaInfos.clear(); + // ... close the default CF ... + IOUtils.closeQuietly(defaultColumnFamily); - try { - db.close(); - } catch (Exception ex) { - LOG.info("Exception while closing RocksDB object.", ex); - } + // ... and finally close the DB instance ... + IOUtils.closeQuietly(db); + // invalidate the reference before releasing the lock so that other accesses will not cause crashes db = null; + } } - IOUtils.closeQuietly(columnOptions); + kvStateInformation.clear(); + restoredKvStateMetaInfos.clear(); + IOUtils.closeQuietly(dbOptions); + IOUtils.closeQuietly(columnOptions); try { FileUtils.deleteDirectory(instanceBasePath); @@ -1039,6 +1050,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException { List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(stateColumnFamilyDescriptors); + + // we add the required descriptor for the default CF in last position. columnFamilyDescriptors.add( new ColumnFamilyDescriptor( "default".getBytes(ConfigConstants.DEFAULT_CHARSET), columnOptions)); @@ -1057,9 +1070,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { throw new IOException("Error while opening RocksDB instance.", e); } + final int defaultColumnFamilyIndex = columnFamilyHandles.size() - 1; + + // extract the default column family. + defaultColumnFamily = columnFamilyHandles.get(defaultColumnFamilyIndex); + if (stateColumnFamilyHandles != null) { + // return all CFs except the default CF which is kept separately because it is not used in Flink operations. stateColumnFamilyHandles.addAll( - columnFamilyHandles.subList(0, columnFamilyHandles.size() - 1)); + columnFamilyHandles.subList(0, defaultColumnFamilyIndex)); } return db; http://git-wip-us.apache.org/repos/asf/flink/blob/d818fc48/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index ff433ad..8ec29e2 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -260,16 +260,7 @@ public class RocksDBStateBackendConfigTest { rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions()); - try ( - DBOptions optCreated = rocksDbBackend.getDbOptions(); - DBOptions optReference = new DBOptions(); - ColumnFamilyOptions colCreated = rocksDbBackend.getColumnOptions()) { - - // check that our instance uses something that we configured - assertEquals(true, optCreated.disableDataSync()); - // just ensure that we pickend an option that actually differs from the reference. - assertEquals(false, optReference.disableDataSync()); - + try (ColumnFamilyOptions colCreated = rocksDbBackend.getColumnOptions()) { assertEquals(CompactionStyle.LEVEL, colCreated.compactionStyle()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d818fc48/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java index 3231e96..b26fa48 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java @@ -31,7 +31,6 @@ import org.rocksdb.NativeLibraryLoader; import org.rocksdb.Options; import org.rocksdb.RocksDB; import org.rocksdb.RocksIterator; -import org.rocksdb.StringAppendOperator; import org.rocksdb.WriteOptions; import sun.misc.Unsafe; @@ -39,6 +38,8 @@ import java.io.File; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import static org.apache.flink.contrib.streaming.state.PredefinedOptions.MERGE_OPERATOR_NAME; + /** * Test that validates that the performance of RocksDB is as expected. * This test guards against the bug filed as 'FLINK-5756' @@ -74,9 +75,8 @@ public class RocksDBPerformanceTest extends TestLogger { .setIncreaseParallelism(4) .setUseFsync(false) .setMaxOpenFiles(-1) - .setDisableDataSync(true) .setCreateIfMissing(true) - .setMergeOperator(new StringAppendOperator()); + .setMergeOperatorName(MERGE_OPERATOR_NAME); final WriteOptions write_options = new WriteOptions() .setSync(false) @@ -152,9 +152,8 @@ public class RocksDBPerformanceTest extends TestLogger { .setIncreaseParallelism(4) .setUseFsync(false) .setMaxOpenFiles(-1) - .setDisableDataSync(true) .setCreateIfMissing(true) - .setMergeOperator(new StringAppendOperator()); + .setMergeOperatorName(MERGE_OPERATOR_NAME); final WriteOptions write_options = new WriteOptions() .setSync(false)