Repository: samza Updated Branches: refs/heads/master c3b469e0a -> f2fd9aaab
SAMZA-439; fix stack overflow in CachedStore Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f2fd9aaa Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f2fd9aaa Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f2fd9aaa Branch: refs/heads/master Commit: f2fd9aaab2dbd2a1508c498a4071f79e29102105 Parents: c3b469e Author: Manikumar Reddy <[email protected]> Authored: Mon Feb 2 11:08:13 2015 -0800 Committer: Chris Riccomini <[email protected]> Committed: Mon Feb 2 11:08:13 2015 -0800 ---------------------------------------------------------------------- .../org/apache/samza/storage/kv/CachedStore.scala | 12 +++++++----- .../apache/samza/storage/kv/TestKeyValueStores.scala | 13 ++++++++++--- 2 files changed, 17 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f2fd9aaa/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala index 1fa96ba..1971b1f 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala @@ -20,8 +20,8 @@ package org.apache.samza.storage.kv import org.apache.samza.util.Logging - import scala.collection._ +import java.util.Arrays /** * A write-behind caching layer around the leveldb store. The purpose of this cache is three-fold: @@ -150,13 +150,15 @@ class CachedStore[K, V]( metrics.flushes.inc // write out the contents of the dirty list oldest first - val batch = new java.util.ArrayList[Entry[K, V]](this.dirtyCount) - for (k <- this.dirty.reverse) { + val batch = new Array[Entry[K, V]](this.dirtyCount) + var pos : Int = this.dirtyCount - 1; + for (k <- this.dirty) { val entry = this.cache.get(k) entry.dirty = null // not dirty any more - batch.add(new Entry(k, entry.value)) + batch(pos) = new Entry(k, entry.value) + pos -= 1 } - store.putAll(batch) + store.putAll(Arrays.asList(batch : _*)) store.flush metrics.flushBatchSize.inc(batch.size) http://git-wip-us.apache.org/repos/asf/samza/blob/f2fd9aaa/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala index 2082610..f592d8e 100644 --- a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala +++ b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala @@ -112,6 +112,13 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { } @Test + def putStessTest() { + for( a <- 0 to 1900000){ + store.put(b(a+"k"), b("v")) + } + } + + @Test def doublePutAndGet() { val k = b("k2") store.put(k, b("v1")) @@ -252,7 +259,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { .slice(2, TestKeyValueStores.CacheSize) .map(b(_)) .foreach(store.get(_)) - store.put(keys(TestKeyValueStores.CacheSize), something) + store.put(keys(10), something) // Now try and trigger an NPE since the dirty list has an element (1) // that's no longer in the cache. @@ -339,8 +346,8 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { } object TestKeyValueStores { - val CacheSize = 10 - val BatchSize = 5 + val CacheSize = 1000000 + val BatchSize = 1000000 @Parameters def parameters: java.util.Collection[Array[String]] = Arrays.asList( //LevelDB
