[FLINK-6210] [rocksdb] Close RocksDB in ListViaMergeSpeedMiniBenchmark && 
ListViaRangeSpeedMiniBenchmark

This closes #3652


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea054a7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea054a7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea054a7d

Branch: refs/heads/master
Commit: ea054a7d3bc452d153c070b2789ddbe6a2f080a7
Parents: 7a70524
Author: mengji.fy <[email protected]>
Authored: Thu Mar 30 11:12:41 2017 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Fri Apr 21 12:00:32 2017 +0200

----------------------------------------------------------------------
 .../ListViaMergeSpeedMiniBenchmark.java         | 73 ++++++++-------
 .../ListViaRangeSpeedMiniBenchmark.java         | 99 +++++++++++---------
 2 files changed, 93 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea054a7d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
index 2a530e1..f3e084f 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
@@ -53,52 +53,59 @@ public class ListViaMergeSpeedMiniBenchmark {
                final String key = "key";
                final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
 
-               final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
-               final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
+               try {
+                       final byte[] keyBytes = 
key.getBytes(StandardCharsets.UTF_8);
+                       final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
 
-               final int num = 50000;
+                       final int num = 50000;
 
-               // ----- insert -----
-               System.out.println("begin insert");
+                       // ----- insert -----
+                       System.out.println("begin insert");
 
-               final long beginInsert = System.nanoTime();
-               for (int i = 0; i < num; i++) {
-                       rocksDB.merge(write_options, keyBytes, valueBytes);
-               }
-               final long endInsert = System.nanoTime();
-               System.out.println("end insert - duration: " + ((endInsert - 
beginInsert) / 1_000_000) + " ms");
+                       final long beginInsert = System.nanoTime();
+                       for (int i = 0; i < num; i++) {
+                               rocksDB.merge(write_options, keyBytes, 
valueBytes);
+                       }
+                       final long endInsert = System.nanoTime();
+                       System.out.println("end insert - duration: " + 
((endInsert - beginInsert) / 1_000_000) + " ms");
 
-               // ----- read (attempt 1) -----
+                       // ----- read (attempt 1) -----
 
-               final byte[] resultHolder = new byte[num * (valueBytes.length + 
2)];
-               final long beginGet1 = System.nanoTime();
-               rocksDB.get(keyBytes, resultHolder);
-               final long endGet1 = System.nanoTime();
+                       final byte[] resultHolder = new byte[num * 
(valueBytes.length + 2)];
+                       final long beginGet1 = System.nanoTime();
+                       rocksDB.get(keyBytes, resultHolder);
+                       final long endGet1 = System.nanoTime();
 
-               System.out.println("end get - duration: " + ((endGet1 - 
beginGet1) / 1_000_000) + " ms");
+                       System.out.println("end get - duration: " + ((endGet1 - 
beginGet1) / 1_000_000) + " ms");
 
-               // ----- read (attempt 2) -----
+                       // ----- read (attempt 2) -----
 
-               final long beginGet2 = System.nanoTime();
-               rocksDB.get(keyBytes, resultHolder);
-               final long endGet2 = System.nanoTime();
+                       final long beginGet2 = System.nanoTime();
+                       rocksDB.get(keyBytes, resultHolder);
+                       final long endGet2 = System.nanoTime();
 
-               System.out.println("end get - duration: " + ((endGet2 - 
beginGet2) / 1_000_000) + " ms");
+                       System.out.println("end get - duration: " + ((endGet2 - 
beginGet2) / 1_000_000) + " ms");
 
-               // ----- compact -----
-               System.out.println("compacting...");
-               final long beginCompact = System.nanoTime();
-               rocksDB.compactRange();
-               final long endCompact = System.nanoTime();
+                       // ----- compact -----
+                       System.out.println("compacting...");
+                       final long beginCompact = System.nanoTime();
+                       rocksDB.compactRange();
+                       final long endCompact = System.nanoTime();
 
-               System.out.println("end compaction - duration: " + ((endCompact 
- beginCompact) / 1_000_000) + " ms");
+                       System.out.println("end compaction - duration: " + 
((endCompact - beginCompact) / 1_000_000) + " ms");
 
-               // ----- read (attempt 3) -----
+                       // ----- read (attempt 3) -----
 
-               final long beginGet3 = System.nanoTime();
-               rocksDB.get(keyBytes, resultHolder);
-               final long endGet3 = System.nanoTime();
+                       final long beginGet3 = System.nanoTime();
+                       rocksDB.get(keyBytes, resultHolder);
+                       final long endGet3 = System.nanoTime();
 
-               System.out.println("end get - duration: " + ((endGet3 - 
beginGet3) / 1_000_000) + " ms");
+                       System.out.println("end get - duration: " + ((endGet3 - 
beginGet3) / 1_000_000) + " ms");
+               } finally {
+                       rocksDB.close();
+                       options.close();
+                       write_options.close();
+                       FileUtils.deleteDirectory(rocksDir);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ea054a7d/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
index b427ef1..f46e2cd 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
@@ -56,61 +56,68 @@ public class ListViaRangeSpeedMiniBenchmark {
 
                final String key = "key";
                final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+               
+               try {
 
-               final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
-               final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
+                       final byte[] keyBytes = 
key.getBytes(StandardCharsets.UTF_8);
+                       final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
 
-               final byte[] keyTemplate = Arrays.copyOf(keyBytes, 
keyBytes.length + 4);
+                       final byte[] keyTemplate = Arrays.copyOf(keyBytes, 
keyBytes.length + 4);
 
-               final Unsafe unsafe = MemoryUtils.UNSAFE;
-               final long offset = unsafe.arrayBaseOffset(byte[].class) + 
keyTemplate.length - 4;
+                       final Unsafe unsafe = MemoryUtils.UNSAFE;
+                       final long offset = 
unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
 
-               final int num = 50000;
-               System.out.println("begin insert");
+                       final int num = 50000;
+                       System.out.println("begin insert");
 
-               final long beginInsert = System.nanoTime();
-               for (int i = 0; i < num; i++) {
-                       unsafe.putInt(keyTemplate, offset, i);
-                       rocksDB.put(write_options, keyTemplate, valueBytes);
-               }
-               final long endInsert = System.nanoTime();
-               System.out.println("end insert - duration: " + ((endInsert - 
beginInsert) / 1_000_000) + " ms");
-
-               final byte[] resultHolder = new byte[num * valueBytes.length];
-
-               final long beginGet = System.nanoTime();
-
-               final RocksIterator iterator = rocksDB.newIterator();
-               int pos = 0;
-
-               // seek to start
-               unsafe.putInt(keyTemplate, offset, 0);
-               iterator.seek(keyTemplate);
-
-               // mark end
-               unsafe.putInt(keyTemplate, offset, -1);
-
-               // iterate
-               while (iterator.isValid()) {
-                       byte[] currKey = iterator.key();
-                       if (samePrefix(keyBytes, currKey)) {
-                               byte[] currValue = iterator.value();
-                               System.arraycopy(currValue, 0, resultHolder, 
pos, currValue.length);
-                               pos += currValue.length;
-                               iterator.next();
+                       final long beginInsert = System.nanoTime();
+                       for (int i = 0; i < num; i++) {
+                               unsafe.putInt(keyTemplate, offset, i);
+                               rocksDB.put(write_options, keyTemplate, 
valueBytes);
                        }
-                       else {
-                               break;
+                       final long endInsert = System.nanoTime();
+                       System.out.println("end insert - duration: " + 
((endInsert - beginInsert) / 1_000_000) + " ms");
+
+                       final byte[] resultHolder = new byte[num * 
valueBytes.length];
+
+                       final long beginGet = System.nanoTime();
+
+                       final RocksIterator iterator = rocksDB.newIterator();
+                       int pos = 0;
+
+                       try {
+                               // seek to start
+                               unsafe.putInt(keyTemplate, offset, 0);
+                               iterator.seek(keyTemplate);
+
+                               // mark end
+                               unsafe.putInt(keyTemplate, offset, -1);
+
+                               // iterate
+                               while (iterator.isValid()) {
+                                       byte[] currKey = iterator.key();
+                                       if (samePrefix(keyBytes, currKey)) {
+                                               byte[] currValue = 
iterator.value();
+                                               System.arraycopy(currValue, 0, 
resultHolder, pos, currValue.length);
+                                               pos += currValue.length;
+                                               iterator.next();
+                                       } else {
+                                               break;
+                                       }
+                               }
+                       }finally {
+                               iterator.close();
                        }
-               }
 
-               final long endGet = System.nanoTime();
+                       final long endGet = System.nanoTime();
 
-               System.out.println("end get - duration: " + ((endGet - 
beginGet) / 1_000_000) + " ms");
-
-               // WriteOptions and RocksDB ultimately extends 
AbstractNativeReference, so we need to close resource as well.
-               write_options.close();
-               rocksDB.close();
+                       System.out.println("end get - duration: " + ((endGet - 
beginGet) / 1_000_000) + " ms");
+               } finally {
+                       rocksDB.close();
+                       options.close();
+                       write_options.close();
+                       FileUtils.deleteDirectory(rocksDir);
+               }
        }
 
        private static boolean samePrefix(byte[] prefix, byte[] key) {

Reply via email to