Repository: incubator-samza Updated Branches: refs/heads/master b934aa873 -> 90d5a00fc
SAMZA-346; move kv tests into samza-test module Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/90d5a00f Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/90d5a00f Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/90d5a00f Branch: refs/heads/master Commit: 90d5a00fc3a729821410756db0754cdc2f327766 Parents: b934aa8 Author: Navina Ramesh <[email protected]> Authored: Tue Oct 14 15:06:33 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Oct 14 15:06:33 2014 -0700 ---------------------------------------------------------------------- build.gradle | 4 +- .../samza/storage/kv/TestKeyValueStores.scala | 344 ------------------- .../samza/storage/kv/TestKeyValueStores.scala | 344 +++++++++++++++++++ 3 files changed, 346 insertions(+), 346 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/90d5a00f/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index d05dc8d..1b37dbb 100644 --- a/build.gradle +++ b/build.gradle @@ -339,11 +339,9 @@ project(":samza-kv-leveldb_$scalaVersion") { compile project(':samza-api') compile project(":samza-core_$scalaVersion") compile project(":samza-kv_$scalaVersion") - compile project(":samza-kv-inmemory_$scalaVersion") compile "org.scala-lang:scala-library:$scalaLibVersion" compile "org.fusesource.leveldbjni:leveldbjni-all:$leveldbVersion" testCompile "junit:junit:$junitVersion" - testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" } } @@ -359,6 +357,7 @@ project(":samza-test_$scalaVersion") { dependencies { compile project(':samza-api') + compile project(":samza-kv-inmemory_$scalaVersion") compile project(":samza-kv-leveldb_$scalaVersion") compile project(":samza-core_$scalaVersion") compile "org.scala-lang:scala-library:$scalaLibVersion" @@ -369,6 +368,7 @@ project(":samza-test_$scalaVersion") { testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test" testCompile "com.101tec:zkclient:$zkClientVersion" testCompile project(":samza-kafka_$scalaVersion") + testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" testRuntime "org.slf4j:slf4j-simple:$slf4jVersion" } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/90d5a00f/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala ---------------------------------------------------------------------- diff --git a/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala deleted file mode 100644 index eefe114..0000000 --- a/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.storage.kv - -import java.io.File -import java.util.Arrays -import java.util.Random - -import org.apache.samza.serializers.Serde -import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore -import org.iq80.leveldb.Options -import org.junit.After -import org.junit.Assert._ -import org.junit.Before -import org.junit.Test -import org.junit.runner.RunWith -import org.junit.runners.Parameterized -import org.junit.runners.Parameterized.Parameters -import org.scalatest.Assertions.intercept - -import scala.collection.JavaConversions._ -import scala.collection.mutable.ArrayBuffer - -/** - * Test suite to check different key value store operations - * @param typeOfStore Defines type of key-value store (Eg: "leveldb" / "inmemory") - * @param storeConfig Defines whether we're using caching / serde / both / or none in front of the store - */ -@RunWith(value = classOf[Parameterized]) -class TestKeyValueStores(typeOfStore: String, storeConfig: String) { - import TestKeyValueStores._ - - val letters = "abcdefghijklmnopqrstuvwxyz".map(_.toString) - val dir = new File(System.getProperty("java.io.tmpdir"), "leveldb-test-" + new Random().nextInt(Int.MaxValue)) - var store: KeyValueStore[Array[Byte], Array[Byte]] = null - var cache = false - var serde = false - - @Before - def setup() { - val kvStore : KeyValueStore[Array[Byte], Array[Byte]] = if ("leveldb".equals(typeOfStore)) { - dir.mkdirs() - val leveldb = new LevelDbKeyValueStore(dir, new Options) - leveldb - } else if ("inmemory".equals(typeOfStore)) { - val inmemoryDb = new InMemoryKeyValueStore - inmemoryDb - } else { - throw new IllegalArgumentException("Type of store undefined: " + typeOfStore) - } - - val passThroughSerde = new Serde[Array[Byte]] { - def toBytes(obj: Array[Byte]) = obj - def fromBytes(bytes: Array[Byte]) = bytes - } - store = if ("cache".equals(storeConfig)) { - cache = true - new CachedStore(kvStore, CacheSize, BatchSize) - } else if ("serde".equals(storeConfig)) { - serde = true - new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde) - } else if ("cache-and-serde".equals(storeConfig)) { - val serializedStore = new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde) - serde = true - cache = true - new CachedStore(serializedStore, CacheSize, BatchSize) - } else { - kvStore - } - - store = new NullSafeKeyValueStore(store) - } - - @After - def teardown() { - store.close - if (dir != null && dir.listFiles() != null) { - for (file <- dir.listFiles) - file.delete() - dir.delete() - } - } - - @Test - def getNonExistantIsNull() { - assertNull(store.get(b("hello"))) - } - - @Test - def putAndGet() { - store.put(b("k"), b("v")) - assertArrayEquals(b("v"), store.get(b("k"))) - } - - @Test - def doublePutAndGet() { - val k = b("k2") - store.put(k, b("v1")) - store.put(k, b("v2")) - store.put(k, b("v3")) - assertArrayEquals(b("v3"), store.get(k)) - } - - @Test - def testNullsWithSerde() { - if (serde) { - val a = b("a") - val keyMsg = Some(NullSafeKeyValueStore.KEY_ERROR_MSG) - val valMsg = Some(NullSafeKeyValueStore.VAL_ERROR_MSG) - - intercept[NullPointerException] { store.get(null) } - intercept[NullPointerException] { store.delete(null) } - intercept[NullPointerException] { store.put(null, a) } - intercept[NullPointerException] { store.put(a, null) } - intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new Entry[Array[Byte], Array[Byte]](a, null))) } - intercept[NullPointerException] { store.putAll(List(new Entry[Array[Byte], Array[Byte]](null, a))) } - intercept[NullPointerException] { store.range(a, null) } - intercept[NullPointerException] { store.range(null, a) } - } - } - - @Test - def testPutAll() { - // Use CacheSize - 1 so we fully fill the cache, but don't write any data - // out. Our check (below) uses == for cached entries, and using - // numEntires >= CacheSize would result in the LRU cache dropping some - // entries. The result would be that we get the correct byte array back - // from the cache's underlying store (leveldb), but that == would fail. - val numEntries = CacheSize - 1 - val entries = (0 until numEntries).map(i => new Entry(b("k" + i), b("v" + i))) - store.putAll(entries) - if (cache) { - assertTrue("All values should be found and cached.", entries.forall(e => store.get(e.getKey) == e.getValue)) - } else { - assertTrue("All values should be found.", entries.forall(e => Arrays.equals(store.get(e.getKey), e.getValue))) - } - } - - @Test - def testIterateAll() { - for (letter <- letters) - store.put(b(letter.toString), b(letter.toString)) - val iter = store.all - checkRange(letters, iter) - iter.close() - } - - @Test - def testRange() { - val from = 5 - val to = 20 - for (letter <- letters) - store.put(b(letter.toString), b(letter.toString)) - - val iter = store.range(b(letters(from)), b(letters(to))) - checkRange(letters.slice(from, to), iter) - iter.close() - } - - @Test - def testDelete() { - val a = b("a") - assertNull(store.get(a)) - store.put(a, a) - assertArrayEquals(a, store.get(a)) - store.delete(a) - assertNull(store.get(a)) - } - - @Test - def testSimpleScenario() { - val vals = letters.map(b(_)) - for (v <- vals) { - assertNull(store.get(v)) - store.put(v, v) - assertArrayEquals(v, store.get(v)) - } - vals.foreach(v => assertArrayEquals(v, store.get(v))) - vals.foreach(v => store.delete(v)) - vals.foreach(v => assertNull(store.get(v))) - } - - /** - * This test specifically targets an issue in Scala 2.8.1's DoubleLinkedList - * implementation. The issue is that it doesn't work. More specifically, - * creating a DoubleLinkedList from an existing list does not update the - * "prev" field of the existing list's head to point to the new head. As a - * result, in Scala 2.8.1, every DoubleLinkedList node's prev field is null. - * Samza gets around this by manually updating the field itself. See SAMZA-80 - * for details. - * - * This issue is exposed in Samza's KV cache implementation, which uses - * DoubleLinkedList, so all comments in this method are discussing the cached - * implementation, but the test is still useful as a sanity check for - * non-cached stores. - */ - @Test - def testBrokenScalaDoubleLinkedList() { - val something = b("") - val keys = letters - .map(b(_)) - .toArray - - // Load the cache to capacity. - letters - .slice(0, TestKeyValueStores.CacheSize) - .map(b(_)) - .foreach(store.put(_, something)) - - // Now keep everything in the cache, but with an empty dirty list. - store.flush - - // Dirty list is now empty, and every CacheEntry has dirty=null. - - // Corrupt the dirty list by creating two dirty lists that toggle back and - // forth depending on whether the last dirty write was to 1 or 0. The trick - // here is that every element in the cache is treated as the "head" of the - // DoulbeLinkedList (prev==null), even though it's not necessarily. Thus, - // You can end up with multiple nodes each having their own version of the - // dirty list with different elements in them. - store.put(keys(1), something) - store.put(keys(0), something) - store.put(keys(1), something) - store.flush - // The dirty list is now empty, but 0's dirty field actually has 0 and 1. - store.put(keys(0), something) - // The dirty list now has 0 and 1, but 1's dirty field is null in the - // cache because it was just flushed. - - // Get rid of 1 from the cache by reading every other element, and then - // putting one new element. - letters - .slice(2, TestKeyValueStores.CacheSize) - .map(b(_)) - .foreach(store.get(_)) - store.put(keys(TestKeyValueStores.CacheSize), something) - - // Now try and trigger an NPE since the dirty list has an element (1) - // that's no longer in the cache. - store.flush - } - - /** - * A little test that tries to simulate a few common patterns: - * read-modify-write, and do-some-stuff-then-delete (windowing). - */ - @Test - def testRandomReadWriteRemove() { - // Make test deterministic by seeding the random number generator. - val rand = new Random(12345) - val keys = letters - .map(b(_)) - .toArray - - // Map from letter to key byte array used for letter, and expected value. - // We have to go through some acrobatics here since Java's byte array uses - // object identity for .equals. Two byte arrays can have identical byte - // elements, but not be equal. - var expected = Map[String, (Array[Byte], String)]() - - (0 until 100).foreach(loop => { - (0 until 30).foreach(i => { - val idx = rand.nextInt(keys.length) - val randomValue = letters(rand.nextInt(keys.length)) - val key = keys(idx) - val currentVal = store.get(key) - store.put(key, b(randomValue)) - expected += letters(idx) -> (key, randomValue) - }) - - for ((k, v) <- expected) { - val bytes = store.get(v._1) - assertNotNull(bytes) - assertEquals(v._2, new String(bytes, "UTF-8")) - } - - // Iterating and making structural modifications (deletion) does not look right. - // Separating these two steps - val iterator = store.all - val allKeys = new ArrayBuffer[Array[Byte]]() - - // First iterate - while (iterator.hasNext) { - allKeys += iterator.next.getKey - } - iterator.close - - // And now delete - for (key <- allKeys) { - store.delete(key) - expected -= new String(key, "UTF-8") - } - - assertEquals(0, expected.size) - }) - } - - def checkRange(vals: IndexedSeq[String], iter: KeyValueIterator[Array[Byte], Array[Byte]]) { - for (v <- vals) { - assertTrue(iter.hasNext) - val entry = iter.next() - assertEquals(v, s(entry.getKey)) - assertEquals(v, s(entry.getValue)) - } - assertFalse(iter.hasNext) - intercept[NoSuchElementException] { iter.next() } - } - - /** - * Convert string to byte buffer - */ - def b(s: String) = - s.getBytes - - /** - * Convert byte buffer to string - */ - def s(b: Array[Byte]) = - new String(b) -} - -object TestKeyValueStores { - val CacheSize = 10 - val BatchSize = 5 - @Parameters - def parameters: java.util.Collection[Array[String]] = Arrays.asList(Array("leveldb", "cache"), Array("leveldb", "serde"), Array("leveldb", "cache-and-serde"), Array("leveldb", "none"), Array("inmemory", "cache"), Array("inmemory", "serde"), Array("inmemory", "cache-and-serde"), Array("inmemory", "none")) -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/90d5a00f/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala new file mode 100644 index 0000000..eefe114 --- /dev/null +++ b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv + +import java.io.File +import java.util.Arrays +import java.util.Random + +import org.apache.samza.serializers.Serde +import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore +import org.iq80.leveldb.Options +import org.junit.After +import org.junit.Assert._ +import org.junit.Before +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.runners.Parameterized.Parameters +import org.scalatest.Assertions.intercept + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + +/** + * Test suite to check different key value store operations + * @param typeOfStore Defines type of key-value store (Eg: "leveldb" / "inmemory") + * @param storeConfig Defines whether we're using caching / serde / both / or none in front of the store + */ +@RunWith(value = classOf[Parameterized]) +class TestKeyValueStores(typeOfStore: String, storeConfig: String) { + import TestKeyValueStores._ + + val letters = "abcdefghijklmnopqrstuvwxyz".map(_.toString) + val dir = new File(System.getProperty("java.io.tmpdir"), "leveldb-test-" + new Random().nextInt(Int.MaxValue)) + var store: KeyValueStore[Array[Byte], Array[Byte]] = null + var cache = false + var serde = false + + @Before + def setup() { + val kvStore : KeyValueStore[Array[Byte], Array[Byte]] = if ("leveldb".equals(typeOfStore)) { + dir.mkdirs() + val leveldb = new LevelDbKeyValueStore(dir, new Options) + leveldb + } else if ("inmemory".equals(typeOfStore)) { + val inmemoryDb = new InMemoryKeyValueStore + inmemoryDb + } else { + throw new IllegalArgumentException("Type of store undefined: " + typeOfStore) + } + + val passThroughSerde = new Serde[Array[Byte]] { + def toBytes(obj: Array[Byte]) = obj + def fromBytes(bytes: Array[Byte]) = bytes + } + store = if ("cache".equals(storeConfig)) { + cache = true + new CachedStore(kvStore, CacheSize, BatchSize) + } else if ("serde".equals(storeConfig)) { + serde = true + new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde) + } else if ("cache-and-serde".equals(storeConfig)) { + val serializedStore = new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde) + serde = true + cache = true + new CachedStore(serializedStore, CacheSize, BatchSize) + } else { + kvStore + } + + store = new NullSafeKeyValueStore(store) + } + + @After + def teardown() { + store.close + if (dir != null && dir.listFiles() != null) { + for (file <- dir.listFiles) + file.delete() + dir.delete() + } + } + + @Test + def getNonExistantIsNull() { + assertNull(store.get(b("hello"))) + } + + @Test + def putAndGet() { + store.put(b("k"), b("v")) + assertArrayEquals(b("v"), store.get(b("k"))) + } + + @Test + def doublePutAndGet() { + val k = b("k2") + store.put(k, b("v1")) + store.put(k, b("v2")) + store.put(k, b("v3")) + assertArrayEquals(b("v3"), store.get(k)) + } + + @Test + def testNullsWithSerde() { + if (serde) { + val a = b("a") + val keyMsg = Some(NullSafeKeyValueStore.KEY_ERROR_MSG) + val valMsg = Some(NullSafeKeyValueStore.VAL_ERROR_MSG) + + intercept[NullPointerException] { store.get(null) } + intercept[NullPointerException] { store.delete(null) } + intercept[NullPointerException] { store.put(null, a) } + intercept[NullPointerException] { store.put(a, null) } + intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new Entry[Array[Byte], Array[Byte]](a, null))) } + intercept[NullPointerException] { store.putAll(List(new Entry[Array[Byte], Array[Byte]](null, a))) } + intercept[NullPointerException] { store.range(a, null) } + intercept[NullPointerException] { store.range(null, a) } + } + } + + @Test + def testPutAll() { + // Use CacheSize - 1 so we fully fill the cache, but don't write any data + // out. Our check (below) uses == for cached entries, and using + // numEntires >= CacheSize would result in the LRU cache dropping some + // entries. The result would be that we get the correct byte array back + // from the cache's underlying store (leveldb), but that == would fail. + val numEntries = CacheSize - 1 + val entries = (0 until numEntries).map(i => new Entry(b("k" + i), b("v" + i))) + store.putAll(entries) + if (cache) { + assertTrue("All values should be found and cached.", entries.forall(e => store.get(e.getKey) == e.getValue)) + } else { + assertTrue("All values should be found.", entries.forall(e => Arrays.equals(store.get(e.getKey), e.getValue))) + } + } + + @Test + def testIterateAll() { + for (letter <- letters) + store.put(b(letter.toString), b(letter.toString)) + val iter = store.all + checkRange(letters, iter) + iter.close() + } + + @Test + def testRange() { + val from = 5 + val to = 20 + for (letter <- letters) + store.put(b(letter.toString), b(letter.toString)) + + val iter = store.range(b(letters(from)), b(letters(to))) + checkRange(letters.slice(from, to), iter) + iter.close() + } + + @Test + def testDelete() { + val a = b("a") + assertNull(store.get(a)) + store.put(a, a) + assertArrayEquals(a, store.get(a)) + store.delete(a) + assertNull(store.get(a)) + } + + @Test + def testSimpleScenario() { + val vals = letters.map(b(_)) + for (v <- vals) { + assertNull(store.get(v)) + store.put(v, v) + assertArrayEquals(v, store.get(v)) + } + vals.foreach(v => assertArrayEquals(v, store.get(v))) + vals.foreach(v => store.delete(v)) + vals.foreach(v => assertNull(store.get(v))) + } + + /** + * This test specifically targets an issue in Scala 2.8.1's DoubleLinkedList + * implementation. The issue is that it doesn't work. More specifically, + * creating a DoubleLinkedList from an existing list does not update the + * "prev" field of the existing list's head to point to the new head. As a + * result, in Scala 2.8.1, every DoubleLinkedList node's prev field is null. + * Samza gets around this by manually updating the field itself. See SAMZA-80 + * for details. + * + * This issue is exposed in Samza's KV cache implementation, which uses + * DoubleLinkedList, so all comments in this method are discussing the cached + * implementation, but the test is still useful as a sanity check for + * non-cached stores. + */ + @Test + def testBrokenScalaDoubleLinkedList() { + val something = b("") + val keys = letters + .map(b(_)) + .toArray + + // Load the cache to capacity. + letters + .slice(0, TestKeyValueStores.CacheSize) + .map(b(_)) + .foreach(store.put(_, something)) + + // Now keep everything in the cache, but with an empty dirty list. + store.flush + + // Dirty list is now empty, and every CacheEntry has dirty=null. + + // Corrupt the dirty list by creating two dirty lists that toggle back and + // forth depending on whether the last dirty write was to 1 or 0. The trick + // here is that every element in the cache is treated as the "head" of the + // DoulbeLinkedList (prev==null), even though it's not necessarily. Thus, + // You can end up with multiple nodes each having their own version of the + // dirty list with different elements in them. + store.put(keys(1), something) + store.put(keys(0), something) + store.put(keys(1), something) + store.flush + // The dirty list is now empty, but 0's dirty field actually has 0 and 1. + store.put(keys(0), something) + // The dirty list now has 0 and 1, but 1's dirty field is null in the + // cache because it was just flushed. + + // Get rid of 1 from the cache by reading every other element, and then + // putting one new element. + letters + .slice(2, TestKeyValueStores.CacheSize) + .map(b(_)) + .foreach(store.get(_)) + store.put(keys(TestKeyValueStores.CacheSize), something) + + // Now try and trigger an NPE since the dirty list has an element (1) + // that's no longer in the cache. + store.flush + } + + /** + * A little test that tries to simulate a few common patterns: + * read-modify-write, and do-some-stuff-then-delete (windowing). + */ + @Test + def testRandomReadWriteRemove() { + // Make test deterministic by seeding the random number generator. + val rand = new Random(12345) + val keys = letters + .map(b(_)) + .toArray + + // Map from letter to key byte array used for letter, and expected value. + // We have to go through some acrobatics here since Java's byte array uses + // object identity for .equals. Two byte arrays can have identical byte + // elements, but not be equal. + var expected = Map[String, (Array[Byte], String)]() + + (0 until 100).foreach(loop => { + (0 until 30).foreach(i => { + val idx = rand.nextInt(keys.length) + val randomValue = letters(rand.nextInt(keys.length)) + val key = keys(idx) + val currentVal = store.get(key) + store.put(key, b(randomValue)) + expected += letters(idx) -> (key, randomValue) + }) + + for ((k, v) <- expected) { + val bytes = store.get(v._1) + assertNotNull(bytes) + assertEquals(v._2, new String(bytes, "UTF-8")) + } + + // Iterating and making structural modifications (deletion) does not look right. + // Separating these two steps + val iterator = store.all + val allKeys = new ArrayBuffer[Array[Byte]]() + + // First iterate + while (iterator.hasNext) { + allKeys += iterator.next.getKey + } + iterator.close + + // And now delete + for (key <- allKeys) { + store.delete(key) + expected -= new String(key, "UTF-8") + } + + assertEquals(0, expected.size) + }) + } + + def checkRange(vals: IndexedSeq[String], iter: KeyValueIterator[Array[Byte], Array[Byte]]) { + for (v <- vals) { + assertTrue(iter.hasNext) + val entry = iter.next() + assertEquals(v, s(entry.getKey)) + assertEquals(v, s(entry.getValue)) + } + assertFalse(iter.hasNext) + intercept[NoSuchElementException] { iter.next() } + } + + /** + * Convert string to byte buffer + */ + def b(s: String) = + s.getBytes + + /** + * Convert byte buffer to string + */ + def s(b: Array[Byte]) = + new String(b) +} + +object TestKeyValueStores { + val CacheSize = 10 + val BatchSize = 5 + @Parameters + def parameters: java.util.Collection[Array[String]] = Arrays.asList(Array("leveldb", "cache"), Array("leveldb", "serde"), Array("leveldb", "cache-and-serde"), Array("leveldb", "none"), Array("inmemory", "cache"), Array("inmemory", "serde"), Array("inmemory", "cache-and-serde"), Array("inmemory", "none")) +}
