Repository: flink Updated Branches: refs/heads/master e97514090 -> 219ae33d3
Stephan's feedback Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/219ae33d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/219ae33d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/219ae33d Branch: refs/heads/master Commit: 219ae33d36e67e3e74f493cf4956a290bc966a5d Parents: d7b0661 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Fri Jul 28 10:57:50 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Fri Jul 28 13:57:58 2017 +0200 ---------------------------------------------------------------------- .../streaming/state/PredefinedOptions.java | 11 ++------- .../state/RocksDBKeyedStateBackend.java | 8 ++++++- .../streaming/state/RocksDBStateBackend.java | 1 - .../state/RocksDBStateBackendTest.java | 25 ++++++++++++++++++++ .../state/benchmark/RocksDBPerformanceTest.java | 7 +++--- 5 files changed, 37 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/219ae33d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java index cb47ce4..73dc0be 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java @@ -51,8 +51,7 @@ public enum PredefinedOptions { @Override public ColumnFamilyOptions createColumnOptions() { - return new ColumnFamilyOptions() - .setMergeOperatorName(MERGE_OPERATOR_NAME); + return new ColumnFamilyOptions(); } }, @@ -91,7 +90,6 @@ public enum PredefinedOptions { @Override public ColumnFamilyOptions createColumnOptions() { return new ColumnFamilyOptions() - .setMergeOperatorName(MERGE_OPERATOR_NAME) .setCompactionStyle(CompactionStyle.LEVEL) .setLevelCompactionDynamicLevelBytes(true); } @@ -143,7 +141,6 @@ public enum PredefinedOptions { final long writeBufferSize = 64 * 1024 * 1024; return new ColumnFamilyOptions() - .setMergeOperatorName(MERGE_OPERATOR_NAME) .setCompactionStyle(CompactionStyle.LEVEL) .setLevelCompactionDynamicLevelBytes(true) .setTargetFileSizeBase(targetFileSize) @@ -189,16 +186,12 @@ public enum PredefinedOptions { @Override public ColumnFamilyOptions createColumnOptions() { - return new ColumnFamilyOptions() - .setMergeOperatorName(MERGE_OPERATOR_NAME); + return new ColumnFamilyOptions(); } }; // ------------------------------------------------------------------------ - // The name of the merge operator in RocksDB. Do not change except you know exactly what you do. - public static final String MERGE_OPERATOR_NAME = "stringappendtest"; - /** * Creates the {@link DBOptions}for this pre-defined setting. * http://git-wip-us.apache.org/repos/asf/flink/blob/219ae33d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 83b99ad..bba5b55 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -135,6 +135,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class); + /** The name of the merge operator in RocksDB. Do not change except you know exactly what you do. */ + public static final String MERGE_OPERATOR_NAME = "stringappendtest"; + private final String operatorIdentifier; /** The column family options from the options factory. */ @@ -220,7 +223,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; - this.columnOptions = Preconditions.checkNotNull(columnFamilyOptions); + // ensure that we use the right merge operator, because other code relies on this + this.columnOptions = Preconditions.checkNotNull(columnFamilyOptions) + .setMergeOperatorName(MERGE_OPERATOR_NAME); + this.dbOptions = Preconditions.checkNotNull(dbOptions); this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath); http://git-wip-us.apache.org/repos/asf/flink/blob/219ae33d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 4a30489..6ec7ec8 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -73,7 +73,6 @@ public class RocksDBStateBackend extends AbstractStateBackend { /** The number of (re)tries for loading the RocksDB JNI library. */ private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3; - private static boolean rocksDbInitialized = false; // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/219ae33d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index cf363fa..991e0d4 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -48,10 +48,13 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksIterator; @@ -234,6 +237,28 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa } @Test + public void testCorrectMergeOperatorSet() throws IOException { + ColumnFamilyOptions columnFamilyOptions = mock(ColumnFamilyOptions.class); + + try (RocksDBKeyedStateBackend<Integer> test = new RocksDBKeyedStateBackend<>( + "test", + Thread.currentThread().getContextClassLoader(), + tempFolder.newFolder(), + mock(DBOptions.class), + columnFamilyOptions, + mock(TaskKvStateRegistry.class), + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + new ExecutionConfig(), + enableIncrementalCheckpointing)) { + + verify(columnFamilyOptions, Mockito.times(1)) + .setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME); + } + } + + @Test public void testReleasingSnapshotAfterBackendClosed() throws Exception { setupRocksKeyedStateBackend(); RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, http://git-wip-us.apache.org/repos/asf/flink/blob/219ae33d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java index b26fa48..1667e55 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java @@ -18,6 +18,7 @@ package org.apache.flink.contrib.streaming.state.benchmark; +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; import org.apache.flink.core.memory.MemoryUtils; import org.apache.flink.testutils.junit.RetryOnFailure; import org.apache.flink.testutils.junit.RetryRule; @@ -38,8 +39,6 @@ import java.io.File; import java.nio.charset.StandardCharsets; import java.util.Arrays; -import static org.apache.flink.contrib.streaming.state.PredefinedOptions.MERGE_OPERATOR_NAME; - /** * Test that validates that the performance of RocksDB is as expected. * This test guards against the bug filed as 'FLINK-5756' @@ -76,7 +75,7 @@ public class RocksDBPerformanceTest extends TestLogger { .setUseFsync(false) .setMaxOpenFiles(-1) .setCreateIfMissing(true) - .setMergeOperatorName(MERGE_OPERATOR_NAME); + .setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME); final WriteOptions write_options = new WriteOptions() .setSync(false) @@ -153,7 +152,7 @@ public class RocksDBPerformanceTest extends TestLogger { .setUseFsync(false) .setMaxOpenFiles(-1) .setCreateIfMissing(true) - .setMergeOperatorName(MERGE_OPERATOR_NAME); + .setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME); final WriteOptions write_options = new WriteOptions() .setSync(false)