[hotfix] [rocksdb] Convert performance benchmarks to unit tests

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05065451
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05065451
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05065451

Branch: refs/heads/master
Commit: 05065451abf5917f796b00692f43f0a2d2f3ed48
Parents: ea054a7
Author: Stephan Ewen <[email protected]>
Authored: Fri Apr 21 12:19:04 2017 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Apr 21 12:19:04 2017 +0200

----------------------------------------------------------------------
 .../ListViaMergeSpeedMiniBenchmark.java         | 111 ----------
 .../ListViaRangeSpeedMiniBenchmark.java         | 132 ------------
 .../state/benchmark/RocksDBPerformanceTest.java | 204 +++++++++++++++++++
 3 files changed, 204 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/05065451/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
deleted file mode 100644
index f3e084f..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state.benchmark;
-
-import org.apache.flink.util.FileUtils;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.StringAppendOperator;
-import org.rocksdb.WriteOptions;
-
-import java.io.File;
-import java.nio.charset.StandardCharsets;
-
-public class ListViaMergeSpeedMiniBenchmark {
-       
-       public static void main(String[] args) throws Exception {
-               final File rocksDir = new File("/tmp/rdb");
-               FileUtils.deleteDirectory(rocksDir);
-
-               final Options options = new Options()
-                               .setCompactionStyle(CompactionStyle.LEVEL)
-                               .setLevelCompactionDynamicLevelBytes(true)
-                               .setIncreaseParallelism(4)
-                               .setUseFsync(false)
-                               .setMaxOpenFiles(-1)
-                               .setDisableDataSync(true)
-                               .setCreateIfMissing(true)
-                               .setMergeOperator(new StringAppendOperator());
-
-               final WriteOptions write_options = new WriteOptions()
-                               .setSync(false)
-                               .setDisableWAL(true);
-
-               final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath());
-
-               final String key = "key";
-               final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
-
-               try {
-                       final byte[] keyBytes = 
key.getBytes(StandardCharsets.UTF_8);
-                       final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
-
-                       final int num = 50000;
-
-                       // ----- insert -----
-                       System.out.println("begin insert");
-
-                       final long beginInsert = System.nanoTime();
-                       for (int i = 0; i < num; i++) {
-                               rocksDB.merge(write_options, keyBytes, 
valueBytes);
-                       }
-                       final long endInsert = System.nanoTime();
-                       System.out.println("end insert - duration: " + 
((endInsert - beginInsert) / 1_000_000) + " ms");
-
-                       // ----- read (attempt 1) -----
-
-                       final byte[] resultHolder = new byte[num * 
(valueBytes.length + 2)];
-                       final long beginGet1 = System.nanoTime();
-                       rocksDB.get(keyBytes, resultHolder);
-                       final long endGet1 = System.nanoTime();
-
-                       System.out.println("end get - duration: " + ((endGet1 - 
beginGet1) / 1_000_000) + " ms");
-
-                       // ----- read (attempt 2) -----
-
-                       final long beginGet2 = System.nanoTime();
-                       rocksDB.get(keyBytes, resultHolder);
-                       final long endGet2 = System.nanoTime();
-
-                       System.out.println("end get - duration: " + ((endGet2 - 
beginGet2) / 1_000_000) + " ms");
-
-                       // ----- compact -----
-                       System.out.println("compacting...");
-                       final long beginCompact = System.nanoTime();
-                       rocksDB.compactRange();
-                       final long endCompact = System.nanoTime();
-
-                       System.out.println("end compaction - duration: " + 
((endCompact - beginCompact) / 1_000_000) + " ms");
-
-                       // ----- read (attempt 3) -----
-
-                       final long beginGet3 = System.nanoTime();
-                       rocksDB.get(keyBytes, resultHolder);
-                       final long endGet3 = System.nanoTime();
-
-                       System.out.println("end get - duration: " + ((endGet3 - 
beginGet3) / 1_000_000) + " ms");
-               } finally {
-                       rocksDB.close();
-                       options.close();
-                       write_options.close();
-                       FileUtils.deleteDirectory(rocksDir);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05065451/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
deleted file mode 100644
index f46e2cd..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state.benchmark;
-
-import org.apache.flink.core.memory.MemoryUtils;
-import org.apache.flink.util.FileUtils;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.StringAppendOperator;
-import org.rocksdb.WriteOptions;
-import sun.misc.Unsafe;
-
-import java.io.File;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-
-public class ListViaRangeSpeedMiniBenchmark {
-       
-       public static void main(String[] args) throws Exception {
-               final File rocksDir = new File("/tmp/rdb");
-               FileUtils.deleteDirectory(rocksDir);
-
-               final Options options = new Options()
-                               .setCompactionStyle(CompactionStyle.LEVEL)
-                               .setLevelCompactionDynamicLevelBytes(true)
-                               .setIncreaseParallelism(4)
-                               .setUseFsync(false)
-                               .setMaxOpenFiles(-1)
-                               .setDisableDataSync(true)
-                               .setCreateIfMissing(true)
-                               .setMergeOperator(new StringAppendOperator());
-
-               final WriteOptions write_options = new WriteOptions()
-                               .setSync(false)
-                               .setDisableWAL(true);
-
-               final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath());
-
-               final String key = "key";
-               final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
-               
-               try {
-
-                       final byte[] keyBytes = 
key.getBytes(StandardCharsets.UTF_8);
-                       final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
-
-                       final byte[] keyTemplate = Arrays.copyOf(keyBytes, 
keyBytes.length + 4);
-
-                       final Unsafe unsafe = MemoryUtils.UNSAFE;
-                       final long offset = 
unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
-
-                       final int num = 50000;
-                       System.out.println("begin insert");
-
-                       final long beginInsert = System.nanoTime();
-                       for (int i = 0; i < num; i++) {
-                               unsafe.putInt(keyTemplate, offset, i);
-                               rocksDB.put(write_options, keyTemplate, 
valueBytes);
-                       }
-                       final long endInsert = System.nanoTime();
-                       System.out.println("end insert - duration: " + 
((endInsert - beginInsert) / 1_000_000) + " ms");
-
-                       final byte[] resultHolder = new byte[num * 
valueBytes.length];
-
-                       final long beginGet = System.nanoTime();
-
-                       final RocksIterator iterator = rocksDB.newIterator();
-                       int pos = 0;
-
-                       try {
-                               // seek to start
-                               unsafe.putInt(keyTemplate, offset, 0);
-                               iterator.seek(keyTemplate);
-
-                               // mark end
-                               unsafe.putInt(keyTemplate, offset, -1);
-
-                               // iterate
-                               while (iterator.isValid()) {
-                                       byte[] currKey = iterator.key();
-                                       if (samePrefix(keyBytes, currKey)) {
-                                               byte[] currValue = 
iterator.value();
-                                               System.arraycopy(currValue, 0, 
resultHolder, pos, currValue.length);
-                                               pos += currValue.length;
-                                               iterator.next();
-                                       } else {
-                                               break;
-                                       }
-                               }
-                       }finally {
-                               iterator.close();
-                       }
-
-                       final long endGet = System.nanoTime();
-
-                       System.out.println("end get - duration: " + ((endGet - 
beginGet) / 1_000_000) + " ms");
-               } finally {
-                       rocksDB.close();
-                       options.close();
-                       write_options.close();
-                       FileUtils.deleteDirectory(rocksDir);
-               }
-       }
-
-       private static boolean samePrefix(byte[] prefix, byte[] key) {
-               for (int i = 0; i < prefix.length; i++) {
-                       if (prefix[i] != key [i]) {
-                               return false;
-                       }
-               }
-
-               return true;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05065451/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
new file mode 100644
index 0000000..011703e
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.core.memory.MemoryUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.rocksdb.WriteOptions;
+import sun.misc.Unsafe;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+/**
+ * Test that validates that the performance of RocksDB is as expected.
+ * This test guards against the bug filed as 'FLINK-5756'
+ */
+public class RocksDBPerformanceTest extends TestLogger {
+
+       @Rule
+       public final TemporaryFolder TMP = new TemporaryFolder();
+
+       @Test(timeout = 2000)
+       public void testRocksDbMergePerformance() throws Exception {
+               final File rocksDir = TMP.newFolder("rdb");
+
+               final String key = "key";
+               final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+               final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+               final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
+
+               final int num = 50000;
+
+               try (
+                       final Options options = new Options()
+                                       
.setCompactionStyle(CompactionStyle.LEVEL)
+                                       
.setLevelCompactionDynamicLevelBytes(true)
+                                       .setIncreaseParallelism(4)
+                                       .setUseFsync(false)
+                                       .setMaxOpenFiles(-1)
+                                       .setDisableDataSync(true)
+                                       .setCreateIfMissing(true)
+                                       .setMergeOperator(new 
StringAppendOperator());
+
+                       final WriteOptions write_options = new WriteOptions()
+                                       .setSync(false)
+                                       .setDisableWAL(true);
+
+                       final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath()))
+               {
+                       // ----- insert -----
+                       log.info("begin insert");
+
+                       final long beginInsert = System.nanoTime();
+                       for (int i = 0; i < num; i++) {
+                               rocksDB.merge(write_options, keyBytes, 
valueBytes);
+                       }
+                       final long endInsert = System.nanoTime();
+                       log.info("end insert - duration: {} ms", (endInsert - 
beginInsert) / 1_000_000);
+
+                       // ----- read (attempt 1) -----
+
+                       final byte[] resultHolder = new byte[num * 
(valueBytes.length + 2)];
+                       final long beginGet1 = System.nanoTime();
+                       rocksDB.get(keyBytes, resultHolder);
+                       final long endGet1 = System.nanoTime();
+
+                       log.info("end get - duration: {} ms", (endGet1 - 
beginGet1) / 1_000_000);
+
+                       // ----- read (attempt 2) -----
+
+                       final long beginGet2 = System.nanoTime();
+                       rocksDB.get(keyBytes, resultHolder);
+                       final long endGet2 = System.nanoTime();
+
+                       log.info("end get - duration: {} ms", (endGet2 - 
beginGet2) / 1_000_000);
+
+                       // ----- compact -----
+                       log.info("compacting...");
+                       final long beginCompact = System.nanoTime();
+                       rocksDB.compactRange();
+                       final long endCompact = System.nanoTime();
+
+                       log.info("end compaction - duration: {} ms", 
(endCompact - beginCompact) / 1_000_000);
+
+                       // ----- read (attempt 3) -----
+
+                       final long beginGet3 = System.nanoTime();
+                       rocksDB.get(keyBytes, resultHolder);
+                       final long endGet3 = System.nanoTime();
+
+                       log.info("end get - duration: {} ms", (endGet3 - 
beginGet3) / 1_000_000);
+               }
+       }
+
+       @Test(timeout = 2000)
+       public void testRocksDbRangeGetPerformance() throws Exception {
+               final File rocksDir = TMP.newFolder("rdb");
+
+               final String key = "key";
+               final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+               final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+               final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
+
+               final int num = 50000;
+
+               try (
+                       final Options options = new Options()
+                                       
.setCompactionStyle(CompactionStyle.LEVEL)
+                                       
.setLevelCompactionDynamicLevelBytes(true)
+                                       .setIncreaseParallelism(4)
+                                       .setUseFsync(false)
+                                       .setMaxOpenFiles(-1)
+                                       .setDisableDataSync(true)
+                                       .setCreateIfMissing(true)
+                                       .setMergeOperator(new 
StringAppendOperator());
+
+                       final WriteOptions write_options = new WriteOptions()
+                                       .setSync(false)
+                                       .setDisableWAL(true);
+
+                       final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath()))
+               {
+                       final byte[] keyTemplate = Arrays.copyOf(keyBytes, 
keyBytes.length + 4);
+
+                       final Unsafe unsafe = MemoryUtils.UNSAFE;
+                       final long offset = 
unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
+
+                       log.info("begin insert");
+
+                       final long beginInsert = System.nanoTime();
+                       for (int i = 0; i < num; i++) {
+                               unsafe.putInt(keyTemplate, offset, i);
+                               rocksDB.put(write_options, keyTemplate, 
valueBytes);
+                       }
+                       final long endInsert = System.nanoTime();
+                       log.info("end insert - duration: {} ms", (endInsert - 
beginInsert) / 1_000_000);
+
+                       @SuppressWarnings("MismatchedReadAndWriteOfArray")
+                       final byte[] resultHolder = new byte[num * 
valueBytes.length];
+
+                       final long beginGet = System.nanoTime();
+
+                       int pos = 0;
+
+                       try (final RocksIterator iterator = 
rocksDB.newIterator()) {
+                               // seek to start
+                               unsafe.putInt(keyTemplate, offset, 0);
+                               iterator.seek(keyTemplate);
+
+                               // iterate
+                               while (iterator.isValid() && 
samePrefix(keyBytes, iterator.key())) {
+                                       byte[] currValue = iterator.value();
+                                       System.arraycopy(currValue, 0, 
resultHolder, pos, currValue.length);
+                                       pos += currValue.length;
+                                       iterator.next();
+                               }
+                       }
+
+                       final long endGet = System.nanoTime();
+
+                       log.info("end get - duration: {} ms", (endGet - 
beginGet) / 1_000_000);
+               }
+       }
+
+
+       private static boolean samePrefix(byte[] prefix, byte[] key) {
+               for (int i = 0; i < prefix.length; i++) {
+                       if (prefix[i] != key [i]) {
+                               return false;
+                       }
+               }
+
+               return true;
+       }
+}

Reply via email to