Repository: flink
Updated Branches:
  refs/heads/master e97514090 -> 219ae33d3


Stephan's feedback


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/219ae33d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/219ae33d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/219ae33d

Branch: refs/heads/master
Commit: 219ae33d36e67e3e74f493cf4956a290bc966a5d
Parents: d7b0661
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Fri Jul 28 10:57:50 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri Jul 28 13:57:58 2017 +0200

----------------------------------------------------------------------
 .../streaming/state/PredefinedOptions.java      | 11 ++-------
 .../state/RocksDBKeyedStateBackend.java         |  8 ++++++-
 .../streaming/state/RocksDBStateBackend.java    |  1 -
 .../state/RocksDBStateBackendTest.java          | 25 ++++++++++++++++++++
 .../state/benchmark/RocksDBPerformanceTest.java |  7 +++---
 5 files changed, 37 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/219ae33d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
index cb47ce4..73dc0be 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
@@ -51,8 +51,7 @@ public enum PredefinedOptions {
 
                @Override
                public ColumnFamilyOptions createColumnOptions() {
-                       return new ColumnFamilyOptions()
-                                       
.setMergeOperatorName(MERGE_OPERATOR_NAME);
+                       return new ColumnFamilyOptions();
                }
 
        },
@@ -91,7 +90,6 @@ public enum PredefinedOptions {
                @Override
                public ColumnFamilyOptions createColumnOptions() {
                        return new ColumnFamilyOptions()
-                                       
.setMergeOperatorName(MERGE_OPERATOR_NAME)
                                        
.setCompactionStyle(CompactionStyle.LEVEL)
                                        
.setLevelCompactionDynamicLevelBytes(true);
                }
@@ -143,7 +141,6 @@ public enum PredefinedOptions {
                        final long writeBufferSize = 64 * 1024 * 1024;
 
                        return new ColumnFamilyOptions()
-                                       
.setMergeOperatorName(MERGE_OPERATOR_NAME)
                                        
.setCompactionStyle(CompactionStyle.LEVEL)
                                        
.setLevelCompactionDynamicLevelBytes(true)
                                        .setTargetFileSizeBase(targetFileSize)
@@ -189,16 +186,12 @@ public enum PredefinedOptions {
 
                @Override
                public ColumnFamilyOptions createColumnOptions() {
-                       return new ColumnFamilyOptions()
-                                       
.setMergeOperatorName(MERGE_OPERATOR_NAME);
+                       return new ColumnFamilyOptions();
                }
        };
 
        // 
------------------------------------------------------------------------
 
-       // The name of the merge operator in RocksDB. Do not change except you 
know exactly what you do.
-       public static final String MERGE_OPERATOR_NAME = "stringappendtest";
-
        /**
         * Creates the {@link DBOptions}for this pre-defined setting.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/219ae33d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 83b99ad..bba5b55 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -135,6 +135,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
 
+       /** The name of the merge operator in RocksDB. Do not change except you 
know exactly what you do. */
+       public static final String MERGE_OPERATOR_NAME = "stringappendtest";
+
        private final String operatorIdentifier;
 
        /** The column family options from the options factory. */
@@ -220,7 +223,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
 
-               this.columnOptions = 
Preconditions.checkNotNull(columnFamilyOptions);
+               // ensure that we use the right merge operator, because other 
code relies on this
+               this.columnOptions = 
Preconditions.checkNotNull(columnFamilyOptions)
+                       .setMergeOperatorName(MERGE_OPERATOR_NAME);
+
                this.dbOptions = Preconditions.checkNotNull(dbOptions);
 
                this.instanceBasePath = 
Preconditions.checkNotNull(instanceBasePath);

http://git-wip-us.apache.org/repos/asf/flink/blob/219ae33d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 4a30489..6ec7ec8 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -73,7 +73,6 @@ public class RocksDBStateBackend extends AbstractStateBackend 
{
        /** The number of (re)tries for loading the RocksDB JNI library. */
        private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
 
-
        private static boolean rocksDbInitialized = false;
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/219ae33d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index cf363fa..991e0d4 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -48,10 +48,13 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksIterator;
@@ -234,6 +237,28 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
        }
 
        @Test
+       public void testCorrectMergeOperatorSet() throws IOException {
+               ColumnFamilyOptions columnFamilyOptions = 
mock(ColumnFamilyOptions.class);
+
+               try (RocksDBKeyedStateBackend<Integer> test = new 
RocksDBKeyedStateBackend<>(
+                       "test",
+                       Thread.currentThread().getContextClassLoader(),
+                       tempFolder.newFolder(),
+                       mock(DBOptions.class),
+                       columnFamilyOptions,
+                       mock(TaskKvStateRegistry.class),
+                       IntSerializer.INSTANCE,
+                       1,
+                       new KeyGroupRange(0, 0),
+                       new ExecutionConfig(),
+                       enableIncrementalCheckpointing)) {
+
+                       verify(columnFamilyOptions, Mockito.times(1))
+                               
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
+               }
+       }
+
+       @Test
        public void testReleasingSnapshotAfterBackendClosed() throws Exception {
                setupRocksKeyedStateBackend();
                RunnableFuture<KeyedStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/219ae33d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index b26fa48..1667e55 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.contrib.streaming.state.benchmark;
 
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
 import org.apache.flink.core.memory.MemoryUtils;
 import org.apache.flink.testutils.junit.RetryOnFailure;
 import org.apache.flink.testutils.junit.RetryRule;
@@ -38,8 +39,6 @@ import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
-import static 
org.apache.flink.contrib.streaming.state.PredefinedOptions.MERGE_OPERATOR_NAME;
-
 /**
  * Test that validates that the performance of RocksDB is as expected.
  * This test guards against the bug filed as 'FLINK-5756'
@@ -76,7 +75,7 @@ public class RocksDBPerformanceTest extends TestLogger {
                                        .setUseFsync(false)
                                        .setMaxOpenFiles(-1)
                                        .setCreateIfMissing(true)
-                                       
.setMergeOperatorName(MERGE_OPERATOR_NAME);
+                                       
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
 
                        final WriteOptions write_options = new WriteOptions()
                                        .setSync(false)
@@ -153,7 +152,7 @@ public class RocksDBPerformanceTest extends TestLogger {
                                        .setUseFsync(false)
                                        .setMaxOpenFiles(-1)
                                        .setCreateIfMissing(true)
-                                       
.setMergeOperatorName(MERGE_OPERATOR_NAME);
+                                       
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
 
                        final WriteOptions write_options = new WriteOptions()
                                        .setSync(false)

Reply via email to