[FLINK-6210] [rocksdb] Close RocksDB in ListViaMergeSpeedMiniBenchmark && ListViaRangeSpeedMiniBenchmark
This closes #3652 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea054a7d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea054a7d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea054a7d Branch: refs/heads/master Commit: ea054a7d3bc452d153c070b2789ddbe6a2f080a7 Parents: 7a70524 Author: mengji.fy <[email protected]> Authored: Thu Mar 30 11:12:41 2017 +0800 Committer: Stephan Ewen <[email protected]> Committed: Fri Apr 21 12:00:32 2017 +0200 ---------------------------------------------------------------------- .../ListViaMergeSpeedMiniBenchmark.java | 73 ++++++++------- .../ListViaRangeSpeedMiniBenchmark.java | 99 +++++++++++--------- 2 files changed, 93 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ea054a7d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java index 2a530e1..f3e084f 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java @@ -53,52 +53,59 @@ public class ListViaMergeSpeedMiniBenchmark { final String key = "key"; final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; - final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); - final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + try { + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); - final int num = 50000; + final int num = 50000; - // ----- insert ----- - System.out.println("begin insert"); + // ----- insert ----- + System.out.println("begin insert"); - final long beginInsert = System.nanoTime(); - for (int i = 0; i < num; i++) { - rocksDB.merge(write_options, keyBytes, valueBytes); - } - final long endInsert = System.nanoTime(); - System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); + final long beginInsert = System.nanoTime(); + for (int i = 0; i < num; i++) { + rocksDB.merge(write_options, keyBytes, valueBytes); + } + final long endInsert = System.nanoTime(); + System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); - // ----- read (attempt 1) ----- + // ----- read (attempt 1) ----- - final byte[] resultHolder = new byte[num * (valueBytes.length + 2)]; - final long beginGet1 = System.nanoTime(); - rocksDB.get(keyBytes, resultHolder); - final long endGet1 = System.nanoTime(); + final byte[] resultHolder = new byte[num * (valueBytes.length + 2)]; + final long beginGet1 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet1 = System.nanoTime(); - System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms"); + System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms"); - // ----- read (attempt 2) ----- + // ----- read (attempt 2) ----- - final long beginGet2 = System.nanoTime(); - rocksDB.get(keyBytes, resultHolder); - final long endGet2 = System.nanoTime(); + final long beginGet2 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet2 = System.nanoTime(); - System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms"); + System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms"); - // ----- compact ----- - System.out.println("compacting..."); - final long beginCompact = System.nanoTime(); - rocksDB.compactRange(); - final long endCompact = System.nanoTime(); + // ----- compact ----- + System.out.println("compacting..."); + final long beginCompact = System.nanoTime(); + rocksDB.compactRange(); + final long endCompact = System.nanoTime(); - System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms"); + System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms"); - // ----- read (attempt 3) ----- + // ----- read (attempt 3) ----- - final long beginGet3 = System.nanoTime(); - rocksDB.get(keyBytes, resultHolder); - final long endGet3 = System.nanoTime(); + final long beginGet3 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet3 = System.nanoTime(); - System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms"); + System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms"); + } finally { + rocksDB.close(); + options.close(); + write_options.close(); + FileUtils.deleteDirectory(rocksDir); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/ea054a7d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java index b427ef1..f46e2cd 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java @@ -56,61 +56,68 @@ public class ListViaRangeSpeedMiniBenchmark { final String key = "key"; final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; + + try { - final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); - final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); - final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); + final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); - final Unsafe unsafe = MemoryUtils.UNSAFE; - final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4; + final Unsafe unsafe = MemoryUtils.UNSAFE; + final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4; - final int num = 50000; - System.out.println("begin insert"); + final int num = 50000; + System.out.println("begin insert"); - final long beginInsert = System.nanoTime(); - for (int i = 0; i < num; i++) { - unsafe.putInt(keyTemplate, offset, i); - rocksDB.put(write_options, keyTemplate, valueBytes); - } - final long endInsert = System.nanoTime(); - System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); - - final byte[] resultHolder = new byte[num * valueBytes.length]; - - final long beginGet = System.nanoTime(); - - final RocksIterator iterator = rocksDB.newIterator(); - int pos = 0; - - // seek to start - unsafe.putInt(keyTemplate, offset, 0); - iterator.seek(keyTemplate); - - // mark end - unsafe.putInt(keyTemplate, offset, -1); - - // iterate - while (iterator.isValid()) { - byte[] currKey = iterator.key(); - if (samePrefix(keyBytes, currKey)) { - byte[] currValue = iterator.value(); - System.arraycopy(currValue, 0, resultHolder, pos, currValue.length); - pos += currValue.length; - iterator.next(); + final long beginInsert = System.nanoTime(); + for (int i = 0; i < num; i++) { + unsafe.putInt(keyTemplate, offset, i); + rocksDB.put(write_options, keyTemplate, valueBytes); } - else { - break; + final long endInsert = System.nanoTime(); + System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); + + final byte[] resultHolder = new byte[num * valueBytes.length]; + + final long beginGet = System.nanoTime(); + + final RocksIterator iterator = rocksDB.newIterator(); + int pos = 0; + + try { + // seek to start + unsafe.putInt(keyTemplate, offset, 0); + iterator.seek(keyTemplate); + + // mark end + unsafe.putInt(keyTemplate, offset, -1); + + // iterate + while (iterator.isValid()) { + byte[] currKey = iterator.key(); + if (samePrefix(keyBytes, currKey)) { + byte[] currValue = iterator.value(); + System.arraycopy(currValue, 0, resultHolder, pos, currValue.length); + pos += currValue.length; + iterator.next(); + } else { + break; + } + } + }finally { + iterator.close(); } - } - final long endGet = System.nanoTime(); + final long endGet = System.nanoTime(); - System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms"); - - // WriteOptions and RocksDB ultimately extends AbstractNativeReference, so we need to close resource as well. - write_options.close(); - rocksDB.close(); + System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms"); + } finally { + rocksDB.close(); + options.close(); + write_options.close(); + FileUtils.deleteDirectory(rocksDir); + } } private static boolean samePrefix(byte[] prefix, byte[] key) {
