[FLINK-7220] [checkpoints] Update RocksDB dependency to 5.5.5

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d818fc48
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d818fc48
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d818fc48

Branch: refs/heads/master
Commit: d818fc48fbc9bfbd613aecdaa0cefcf2c6622289
Parents: e975140
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Mon May 29 11:11:41 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri Jul 28 13:57:58 2017 +0200

----------------------------------------------------------------------
 .../flink-statebackend-rocksdb/pom.xml          |  6 +--
 .../streaming/state/PredefinedOptions.java      | 20 ++++----
 .../state/RocksDBKeyedStateBackend.java         | 51 ++++++++++++++------
 .../state/RocksDBStateBackendConfigTest.java    | 11 +----
 .../state/benchmark/RocksDBPerformanceTest.java |  9 ++--
 5 files changed, 53 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d818fc48/flink-contrib/flink-statebackend-rocksdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml 
b/flink-contrib/flink-statebackend-rocksdb/pom.xml
index 527ca18..fa97e07 100644
--- a/flink-contrib/flink-statebackend-rocksdb/pom.xml
+++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml
@@ -55,9 +55,9 @@ under the License.
                </dependency>
 
                <dependency>
-                       <groupId>com.data-artisans</groupId>
-                       <artifactId>frocksdbjni</artifactId>
-                       <version>4.11.2-artisans</version>
+                       <groupId>org.rocksdb</groupId>
+                       <artifactId>rocksdbjni</artifactId>
+                       <version>5.5.5</version>
                </dependency>
 
                <!-- test dependencies -->

http://git-wip-us.apache.org/repos/asf/flink/blob/d818fc48/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
index f606131..cb47ce4 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
@@ -19,10 +19,10 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.CompactionStyle;
 import org.rocksdb.DBOptions;
-import org.rocksdb.StringAppendOperator;
 
 /**
  * The {@code PredefinedOptions} are configuration settings for the {@link 
RocksDBStateBackend}.
@@ -46,14 +46,13 @@ public enum PredefinedOptions {
                @Override
                public DBOptions createDBOptions() {
                        return new DBOptions()
-                                       .setUseFsync(false)
-                                       .setDisableDataSync(true);
+                                       .setUseFsync(false);
                }
 
                @Override
                public ColumnFamilyOptions createColumnOptions() {
                        return new ColumnFamilyOptions()
-                                       .setMergeOperator(new 
StringAppendOperator());
+                                       
.setMergeOperatorName(MERGE_OPERATOR_NAME);
                }
 
        },
@@ -86,14 +85,13 @@ public enum PredefinedOptions {
                        return new DBOptions()
                                        .setIncreaseParallelism(4)
                                        .setUseFsync(false)
-                                       .setDisableDataSync(true)
                                        .setMaxOpenFiles(-1);
                }
 
                @Override
                public ColumnFamilyOptions createColumnOptions() {
                        return new ColumnFamilyOptions()
-                                       .setMergeOperator(new 
StringAppendOperator())
+                                       
.setMergeOperatorName(MERGE_OPERATOR_NAME)
                                        
.setCompactionStyle(CompactionStyle.LEVEL)
                                        
.setLevelCompactionDynamicLevelBytes(true);
                }
@@ -133,7 +131,6 @@ public enum PredefinedOptions {
                        return new DBOptions()
                                        .setIncreaseParallelism(4)
                                        .setUseFsync(false)
-                                       .setDisableDataSync(true)
                                        .setMaxOpenFiles(-1);
                }
 
@@ -146,7 +143,7 @@ public enum PredefinedOptions {
                        final long writeBufferSize = 64 * 1024 * 1024;
 
                        return new ColumnFamilyOptions()
-                                       .setMergeOperator(new 
StringAppendOperator())
+                                       
.setMergeOperatorName(MERGE_OPERATOR_NAME)
                                        
.setCompactionStyle(CompactionStyle.LEVEL)
                                        
.setLevelCompactionDynamicLevelBytes(true)
                                        .setTargetFileSizeBase(targetFileSize)
@@ -158,6 +155,7 @@ public enum PredefinedOptions {
                                                        new 
BlockBasedTableConfig()
                                                                        
.setBlockCacheSize(blockCacheSize)
                                                                        
.setBlockSize(blockSize)
+                                                                       
.setFilter(new BloomFilter())
                                        );
                }
        },
@@ -186,19 +184,21 @@ public enum PredefinedOptions {
                        return new DBOptions()
                                        .setIncreaseParallelism(4)
                                        .setUseFsync(false)
-                                       .setDisableDataSync(true)
                                        .setMaxOpenFiles(-1);
                }
 
                @Override
                public ColumnFamilyOptions createColumnOptions() {
                        return new ColumnFamilyOptions()
-                                       .setMergeOperator(new 
StringAppendOperator());
+                                       
.setMergeOperatorName(MERGE_OPERATOR_NAME);
                }
        };
 
        // 
------------------------------------------------------------------------
 
+       // The name of the merge operator in RocksDB. Do not change except you 
know exactly what you do.
+       public static final String MERGE_OPERATOR_NAME = "stringappendtest";
+
        /**
         * Creates the {@link DBOptions}for this pre-defined setting.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/d818fc48/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 9d289b4..83b99ad 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -126,6 +126,10 @@ import java.util.concurrent.RunnableFuture;
  * streams provided by a {@link 
org.apache.flink.runtime.state.CheckpointStreamFactory} upon
  * checkpointing. This state backend can store very large state that exceeds 
memory and spills
  * to disk. Except for the snapshotting, this class should be accessed as if 
it is not threadsafe.
+ *
+ * <p>This class follows the rules for closing/releasing native RocksDB 
resources as described in
+ + <a 
href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families";>
+ * this document</a>.
  */
 public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
@@ -160,6 +164,12 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        protected RocksDB db;
 
        /**
+        * We are not using the default column family for Flink state ops, but 
we still need to remember this handle so that
+        * we can close it properly when the backend is closed. This is 
required by RocksDB's native memory management.
+        */
+       private ColumnFamilyHandle defaultColumnFamily;
+
+       /**
         * Information about the k/v states as we create them. This is used to 
retrieve the
         * column family that is used for a state and also for sanity checks 
when restoring.
         */
@@ -254,30 +264,31 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        // and access it in a synchronized block that locks on 
#dbDisposeLock.
                        if (db != null) {
 
-                               for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> column :
-                                               kvStateInformation.values()) {
-                                       try {
-                                               column.f0.close();
-                                       } catch (Exception ex) {
-                                               LOG.info("Exception while 
closing ColumnFamilyHandle object.", ex);
-                                       }
+                               // RocksDB's native memory management requires 
that *all* CFs (including default) are closed before the
+                               // DB is closed. So we start with the ones 
created by Flink...
+                               for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData :
+                                       kvStateInformation.values()) {
+
+                                       IOUtils.closeQuietly(columnMetaData.f0);
                                }
 
-                               kvStateInformation.clear();
-                               restoredKvStateMetaInfos.clear();
+                               // ... close the default CF ...
+                               IOUtils.closeQuietly(defaultColumnFamily);
 
-                               try {
-                                       db.close();
-                               } catch (Exception ex) {
-                                       LOG.info("Exception while closing 
RocksDB object.", ex);
-                               }
+                               // ... and finally close the DB instance ...
+                               IOUtils.closeQuietly(db);
 
+                               // invalidate the reference before releasing 
the lock so that other accesses will not cause crashes
                                db = null;
+
                        }
                }
 
-               IOUtils.closeQuietly(columnOptions);
+               kvStateInformation.clear();
+               restoredKvStateMetaInfos.clear();
+
                IOUtils.closeQuietly(dbOptions);
+               IOUtils.closeQuietly(columnOptions);
 
                try {
                        FileUtils.deleteDirectory(instanceBasePath);
@@ -1039,6 +1050,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        List<ColumnFamilyHandle> stateColumnFamilyHandles) 
throws IOException {
 
                List<ColumnFamilyDescriptor> columnFamilyDescriptors = new 
ArrayList<>(stateColumnFamilyDescriptors);
+
+               // we add the required descriptor for the default CF in last 
position.
                columnFamilyDescriptors.add(
                        new ColumnFamilyDescriptor(
                                
"default".getBytes(ConfigConstants.DEFAULT_CHARSET), columnOptions));
@@ -1057,9 +1070,15 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        throw new IOException("Error while opening RocksDB 
instance.", e);
                }
 
+               final int defaultColumnFamilyIndex = columnFamilyHandles.size() 
- 1;
+
+               // extract the default column family.
+               defaultColumnFamily = 
columnFamilyHandles.get(defaultColumnFamilyIndex);
+
                if (stateColumnFamilyHandles != null) {
+                       // return all CFs except the default CF which is kept 
separately because it is not used in Flink operations.
                        stateColumnFamilyHandles.addAll(
-                               columnFamilyHandles.subList(0, 
columnFamilyHandles.size() - 1));
+                               columnFamilyHandles.subList(0, 
defaultColumnFamilyIndex));
                }
 
                return db;

http://git-wip-us.apache.org/repos/asf/flink/blob/d818fc48/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index ff433ad..8ec29e2 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -260,16 +260,7 @@ public class RocksDBStateBackendConfigTest {
                
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
                assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, 
rocksDbBackend.getPredefinedOptions());
 
-               try (
-                               DBOptions optCreated = 
rocksDbBackend.getDbOptions();
-                               DBOptions optReference = new DBOptions();
-                               ColumnFamilyOptions colCreated = 
rocksDbBackend.getColumnOptions()) {
-
-                       // check that our instance uses something that we 
configured
-                       assertEquals(true, optCreated.disableDataSync());
-                       // just ensure that we pickend an option that actually 
differs from the reference.
-                       assertEquals(false, optReference.disableDataSync());
-
+               try (ColumnFamilyOptions colCreated = 
rocksDbBackend.getColumnOptions()) {
                        assertEquals(CompactionStyle.LEVEL, 
colCreated.compactionStyle());
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d818fc48/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index 3231e96..b26fa48 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -31,7 +31,6 @@ import org.rocksdb.NativeLibraryLoader;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksIterator;
-import org.rocksdb.StringAppendOperator;
 import org.rocksdb.WriteOptions;
 import sun.misc.Unsafe;
 
@@ -39,6 +38,8 @@ import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
+import static 
org.apache.flink.contrib.streaming.state.PredefinedOptions.MERGE_OPERATOR_NAME;
+
 /**
  * Test that validates that the performance of RocksDB is as expected.
  * This test guards against the bug filed as 'FLINK-5756'
@@ -74,9 +75,8 @@ public class RocksDBPerformanceTest extends TestLogger {
                                        .setIncreaseParallelism(4)
                                        .setUseFsync(false)
                                        .setMaxOpenFiles(-1)
-                                       .setDisableDataSync(true)
                                        .setCreateIfMissing(true)
-                                       .setMergeOperator(new 
StringAppendOperator());
+                                       
.setMergeOperatorName(MERGE_OPERATOR_NAME);
 
                        final WriteOptions write_options = new WriteOptions()
                                        .setSync(false)
@@ -152,9 +152,8 @@ public class RocksDBPerformanceTest extends TestLogger {
                                        .setIncreaseParallelism(4)
                                        .setUseFsync(false)
                                        .setMaxOpenFiles(-1)
-                                       .setDisableDataSync(true)
                                        .setCreateIfMissing(true)
-                                       .setMergeOperator(new 
StringAppendOperator());
+                                       
.setMergeOperatorName(MERGE_OPERATOR_NAME);
 
                        final WriteOptions write_options = new WriteOptions()
                                        .setSync(false)

Reply via email to