[FLINK-5756] [rocksdb] Add mini benchmarks to reproduce 'merge' performance 
problems


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/677b508a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/677b508a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/677b508a

Branch: refs/heads/master
Commit: 677b508a962c5c7df9308ac3531e799cddec27f6
Parents: d160b5e
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 15 19:13:07 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100

----------------------------------------------------------------------
 .../ListViaMergeSpeedMiniBenchmark.java         | 104 ++++++++++++++++
 .../ListViaRangeSpeedMiniBenchmark.java         | 121 +++++++++++++++++++
 2 files changed, 225 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/677b508a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
new file mode 100644
index 0000000..2a530e1
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.util.FileUtils;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.StringAppendOperator;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+
+public class ListViaMergeSpeedMiniBenchmark {
+       
+       public static void main(String[] args) throws Exception {
+               final File rocksDir = new File("/tmp/rdb");
+               FileUtils.deleteDirectory(rocksDir);
+
+               final Options options = new Options()
+                               .setCompactionStyle(CompactionStyle.LEVEL)
+                               .setLevelCompactionDynamicLevelBytes(true)
+                               .setIncreaseParallelism(4)
+                               .setUseFsync(false)
+                               .setMaxOpenFiles(-1)
+                               .setDisableDataSync(true)
+                               .setCreateIfMissing(true)
+                               .setMergeOperator(new StringAppendOperator());
+
+               final WriteOptions write_options = new WriteOptions()
+                               .setSync(false)
+                               .setDisableWAL(true);
+
+               final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath());
+
+               final String key = "key";
+               final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+               final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+               final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
+
+               final int num = 50000;
+
+               // ----- insert -----
+               System.out.println("begin insert");
+
+               final long beginInsert = System.nanoTime();
+               for (int i = 0; i < num; i++) {
+                       rocksDB.merge(write_options, keyBytes, valueBytes);
+               }
+               final long endInsert = System.nanoTime();
+               System.out.println("end insert - duration: " + ((endInsert - 
beginInsert) / 1_000_000) + " ms");
+
+               // ----- read (attempt 1) -----
+
+               final byte[] resultHolder = new byte[num * (valueBytes.length + 
2)];
+               final long beginGet1 = System.nanoTime();
+               rocksDB.get(keyBytes, resultHolder);
+               final long endGet1 = System.nanoTime();
+
+               System.out.println("end get - duration: " + ((endGet1 - 
beginGet1) / 1_000_000) + " ms");
+
+               // ----- read (attempt 2) -----
+
+               final long beginGet2 = System.nanoTime();
+               rocksDB.get(keyBytes, resultHolder);
+               final long endGet2 = System.nanoTime();
+
+               System.out.println("end get - duration: " + ((endGet2 - 
beginGet2) / 1_000_000) + " ms");
+
+               // ----- compact -----
+               System.out.println("compacting...");
+               final long beginCompact = System.nanoTime();
+               rocksDB.compactRange();
+               final long endCompact = System.nanoTime();
+
+               System.out.println("end compaction - duration: " + ((endCompact 
- beginCompact) / 1_000_000) + " ms");
+
+               // ----- read (attempt 3) -----
+
+               final long beginGet3 = System.nanoTime();
+               rocksDB.get(keyBytes, resultHolder);
+               final long endGet3 = System.nanoTime();
+
+               System.out.println("end get - duration: " + ((endGet3 - 
beginGet3) / 1_000_000) + " ms");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/677b508a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
new file mode 100644
index 0000000..793a35b
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.core.memory.MemoryUtils;
+import org.apache.flink.util.FileUtils;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.rocksdb.WriteOptions;
+import sun.misc.Unsafe;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+public class ListViaRangeSpeedMiniBenchmark {
+       
+       public static void main(String[] args) throws Exception {
+               final File rocksDir = new File("/tmp/rdb");
+               FileUtils.deleteDirectory(rocksDir);
+
+               final Options options = new Options()
+                               .setCompactionStyle(CompactionStyle.LEVEL)
+                               .setLevelCompactionDynamicLevelBytes(true)
+                               .setIncreaseParallelism(4)
+                               .setUseFsync(false)
+                               .setMaxOpenFiles(-1)
+                               .setDisableDataSync(true)
+                               .setCreateIfMissing(true)
+                               .setMergeOperator(new StringAppendOperator());
+
+               final WriteOptions write_options = new WriteOptions()
+                               .setSync(false)
+                               .setDisableWAL(true);
+
+               final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath());
+
+               final String key = "key";
+               final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+               final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+               final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
+
+               final byte[] keyTemplate = Arrays.copyOf(keyBytes, 
keyBytes.length + 4);
+
+               final Unsafe unsafe = MemoryUtils.UNSAFE;
+               final long offset = unsafe.arrayBaseOffset(byte[].class) + 
keyTemplate.length - 4;
+
+               final int num = 50000;
+               System.out.println("begin insert");
+
+               final long beginInsert = System.nanoTime();
+               for (int i = 0; i < num; i++) {
+                       unsafe.putInt(keyTemplate, offset, i);
+                       rocksDB.put(write_options, keyTemplate, valueBytes);
+               }
+               final long endInsert = System.nanoTime();
+               System.out.println("end insert - duration: " + ((endInsert - 
beginInsert) / 1_000_000) + " ms");
+
+               final byte[] resultHolder = new byte[num * valueBytes.length];
+
+               final long beginGet = System.nanoTime();
+
+               final RocksIterator iterator = rocksDB.newIterator();
+               int pos = 0;
+
+               // seek to start
+               unsafe.putInt(keyTemplate, offset, 0);
+               iterator.seek(keyTemplate);
+
+               // mark end
+               unsafe.putInt(keyTemplate, offset, -1);
+
+               // iterate
+               while (iterator.isValid()) {
+                       byte[] currKey = iterator.key();
+                       if (samePrefix(keyBytes, currKey)) {
+                               byte[] currValue = iterator.value();
+                               System.arraycopy(currValue, 0, resultHolder, 
pos, currValue.length);
+                               pos += currValue.length;
+                               iterator.next();
+                       }
+                       else {
+                               break;
+                       }
+               }
+
+               final long endGet = System.nanoTime();
+
+               System.out.println("end get - duration: " + ((endGet - 
beginGet) / 1_000_000) + " ms");
+       }
+
+       private static boolean samePrefix(byte[] prefix, byte[] key) {
+               for (int i = 0; i < prefix.length; i++) {
+                       if (prefix[i] != key [i]) {
+                               return false;
+                       }
+               }
+
+               return true;
+       }
+}

Reply via email to