Repository: samza Updated Branches: refs/heads/master efa50a790 -> 373181c57
SAMZA-938: reverting the APIs added in SAMZA-813 to keep backward compatibility in 0.10.1 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/373181c5 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/373181c5 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/373181c5 Branch: refs/heads/master Commit: 373181c5792b34d2c9c86ad1859cb01722584992 Parents: efa50a7 Author: Jacob Maes <[email protected]> Authored: Thu Apr 28 16:02:47 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Thu Apr 28 16:02:47 2016 -0700 ---------------------------------------------------------------------- .../samza/storage/kv/KeyValueIterator.java | 2 -- .../apache/samza/storage/kv/KeyValueStore.java | 7 ------- .../kv/inmemory/InMemoryKeyValueStore.scala | 16 ---------------- .../samza/storage/kv/RocksDbKeyValueStore.scala | 19 +------------------ .../apache/samza/storage/kv/CachedStore.scala | 10 ---------- .../samza/storage/kv/CachedStoreMetrics.scala | 1 - .../samza/storage/kv/KeyValueStorageEngine.scala | 5 ----- .../kv/KeyValueStorageEngineMetrics.scala | 1 - .../samza/storage/kv/KeyValueStoreMetrics.scala | 1 - .../apache/samza/storage/kv/LoggedStore.scala | 5 ----- .../samza/storage/kv/LoggedStoreMetrics.scala | 1 - .../samza/storage/kv/NullSafeKeyValueStore.scala | 4 ---- .../storage/kv/SerializedKeyValueStore.scala | 9 --------- .../kv/SerializedKeyValueStoreMetrics.scala | 1 - .../samza/storage/kv/MockKeyValueStore.scala | 11 ----------- 15 files changed, 1 insertion(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java index aa1f88c..854ebbf 100644 --- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java +++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java @@ -23,6 +23,4 @@ import java.util.Iterator; public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>> { public void close(); - public void seekToFirst(); - public void seek(K key); } http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java index 7d4d353..b1fea7b 100644 --- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java +++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java @@ -103,13 +103,6 @@ public interface KeyValueStore<K, V> { KeyValueIterator<K, V> all(); /** - * Return an iterator which is yet to be positions. This iterator must be positioned - * first before a call to next() is made. This iterator MUST be closed after use. - * @return An iterator - */ - KeyValueIterator<K, V> newIterator(); - - /** * Closes this key-value store, if applicable, relinquishing any underlying resources. */ void close(); http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala index 661a835..72f25a3 100644 --- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala +++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala @@ -61,18 +61,6 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor } override def hasNext: Boolean = iter.hasNext - - /* - * This method is supposed to be called only after an iterator is created first - * using the store's newIterator position. For some stores, the creation of the - * */ - override def seekToFirst(): Unit = { - throw new UnsupportedOperationException - } - - override def seek(key: Array[Byte]): Unit = { - throw new UnsupportedOperationException - } } override def all(): KeyValueIterator[Array[Byte], Array[Byte]] = { @@ -132,8 +120,4 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor override def getAll(keys: java.util.List[Array[Byte]]): java.util.Map[Array[Byte], Array[Byte]] = { KeyValueStore.Extension.getAll(this, keys); } - - override def newIterator(): KeyValueIterator[Array[Byte], Array[Byte]] = { - throw new UnsupportedOperationException - } } http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index b896810..f0965ae 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -194,11 +194,6 @@ class RocksDbKeyValueStore( new RocksDbIterator(iter) } - def newIterator(): KeyValueIterator[Array[Byte], Array[Byte]] = { - metrics.newIterator.inc - new RocksDbIterator(db.newIterator); - } - def flush { metrics.flushes.inc trace("Flushing.") @@ -211,7 +206,7 @@ class RocksDbKeyValueStore( } class RocksDbIterator(iter: RocksIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] { - private var open = iter.isValid + private var open = true private var firstValueAccessed = false; def close() = { open = false @@ -253,18 +248,6 @@ class RocksDbKeyValueStore( entry } - def seekToFirst: Unit = { - metrics.alls.inc() - iter.seekToFirst() - open = true - } - - def seek(target: Array[Byte]): Unit = { - metrics.alls.inc() - iter.seek(target) - open = true - } - override def finalize() { if (open) { trace("Leaked reference to RocksDB iterator, forcing close.") http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala index 4c4e82e..c28f8db 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala @@ -124,10 +124,6 @@ class CachedStore[K, V]( } override def hasNext: Boolean = iter.hasNext - - override def seekToFirst(): Unit = iter.seekToFirst() - - override def seek(key: K): Unit = iter.seek(key) } override def range(from: K, to: K): KeyValueIterator[K, V] = { @@ -144,12 +140,6 @@ class CachedStore[K, V]( new CachedStoreIterator(store.all()) } - override def newIterator(): KeyValueIterator[K, V] = { - metrics.newIterator.inc - flush() - new CachedStoreIterator(store.newIterator()) - } - override def put(key: K, value: V) { metrics.puts.inc http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala index 04109c9..a11b5fe 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala @@ -35,7 +35,6 @@ class CachedStoreMetrics( val deletes = newCounter("deletes") val flushes = newCounter("flushes") val putAllDirtyEntriesBatchSize = newCounter("put-all-dirty-entries-batch-size") - val newIterator = newCounter("newitarator") def setDirtyCount(getValue: () => Int) { newGauge("dirty-count", getValue) http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala index defc91e..e5a66a4 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala @@ -79,11 +79,6 @@ class KeyValueStorageEngine[K, V]( wrapperStore.all() } - def newIterator() = { - metrics.newIterator.inc - wrapperStore.newIterator() - } - /** * Restore the contents of this key/value store from the change log, * batching updates to underlying raw store to skip wrapping functions for efficiency. http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala index f842b6f..233fba9 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala @@ -33,7 +33,6 @@ class KeyValueStorageEngineMetrics( val puts = newCounter("puts") val deletes = newCounter("deletes") val flushes = newCounter("flushes") - val newIterator = newCounter("newiterator") val restoredMessages = newCounter("messages-restored") val restoredBytes = newCounter("messages-bytes") http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala index c303261..967d509 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala @@ -35,7 +35,6 @@ class KeyValueStoreMetrics( val flushes = newCounter("flushes") val bytesWritten = newCounter("bytes-written") val bytesRead = newCounter("bytes-read") - val newIterator = newCounter("newitertor") override def getPrefix = storeName + "-" } http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala index e293bfc..7bba6ff 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala @@ -56,11 +56,6 @@ class LoggedStore[K, V]( store.all() } - def newIterator() = { - metrics.newIterator.inc - store.newIterator() - } - /** * Perform the local update and log it out to the changelog */ http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala index f856432..743151a 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala @@ -33,7 +33,6 @@ class LoggedStoreMetrics( val puts = newCounter("puts") val deletes = newCounter("deletes") val flushes = newCounter("flushes") - val newIterator = newCounter("newiterator") override def getPrefix = storeName + "-" } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala index 9d7baaa..3de257c 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala @@ -78,10 +78,6 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt store.all } - def newIterator(): KeyValueIterator[K, V] = { - store.newIterator() - } - def flush { store.flush } http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala index cf1a2cc..8e183ef 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala @@ -97,11 +97,6 @@ class SerializedKeyValueStore[K, V]( new DeserializingIterator(store.all) } - def newIterator(): KeyValueIterator[K, V] = { - metrics.newIterator.inc - new DeserializingIterator(store.newIterator()) - } - private class DeserializingIterator(iter: KeyValueIterator[Array[Byte], Array[Byte]]) extends KeyValueIterator[K, V] { def hasNext() = iter.hasNext() def remove() = iter.remove() @@ -112,10 +107,6 @@ class SerializedKeyValueStore[K, V]( val value = fromBytesOrNull(nxt.getValue, msgSerde) new Entry(key, value) } - - override def seekToFirst(): Unit = iter.seekToFirst() - - override def seek(key: K): Unit = iter.seek(toBytesOrNull(key, keySerde)) } def flush { http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala index bb745e0..841e4a2 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala @@ -35,7 +35,6 @@ class SerializedKeyValueStoreMetrics( val flushes = newCounter("flushes") val bytesSerialized = newCounter("bytes-serialized") val bytesDeserialized = newCounter("bytes-deserialized") - val newIterator = newCounter("newiterator") override def getPrefix = storeName + "-" } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala index e14a461..595dd0d 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala @@ -58,14 +58,6 @@ class MockKeyValueStore extends KeyValueStore[String, String] { override def remove(): Unit = iter.remove() override def close(): Unit = Unit - - override def seekToFirst(): Unit = { - throw new UnsupportedOperationException - } - - override def seek(key: String): Unit = { - throw new UnsupportedOperationException - } } override def range(from: String, to: String): KeyValueIterator[String, String] = @@ -74,9 +66,6 @@ class MockKeyValueStore extends KeyValueStore[String, String] { override def all(): KeyValueIterator[String, String] = new MockIterator(kvMap.entrySet().iterator()) - override def newIterator(): KeyValueIterator[String, String] = - throw new UnsupportedOperationException - override def flush() {} // no-op override def close() { kvMap.clear() }
