Repository: samza Updated Branches: refs/heads/master da88096a0 -> 3c78e06ac
SAMZA-1900: Add restore logging info prateekm let me know if there are other places that logging should also be improved in this patch. Author: Daniel Chen <dch...@linkedin.com> Author: Daniel Chen <xrc...@uwaterloo.ca> Reviewers: Prateek Maheshwari <pmaheshw...@apache.org> Closes #653 from dxichen/add-restore-logging-info Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3c78e06a Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3c78e06a Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3c78e06a Branch: refs/heads/master Commit: 3c78e06ac0913dd35866a9e82d8cd7d9d2cc758a Parents: da88096 Author: Daniel Chen <dch...@linkedin.com> Authored: Thu Oct 11 11:56:05 2018 -0700 Committer: Prateek Maheshwari <pmaheshw...@apache.org> Committed: Thu Oct 11 11:56:05 2018 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/samza/storage/StorageEngine.java | 4 ++-- .../org/apache/samza/storage/TaskStorageManager.scala | 2 +- .../apache/samza/storage/kv/RocksDbKeyValueStore.scala | 2 +- .../org/apache/samza/storage/kv/AccessLoggedStore.scala | 6 +++--- .../storage/kv/BaseKeyValueStorageEngineFactory.scala | 2 +- .../apache/samza/storage/kv/KeyValueStorageEngine.scala | 10 +++++++--- .../samza/storage/kv/TestKeyValueStorageEngine.scala | 6 +++++- 7 files changed, 20 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java index a83f8b3..4e6950a 100644 --- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java +++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java @@ -25,7 +25,7 @@ import org.apache.samza.system.IncomingMessageEnvelope; /** * A storage engine for managing state maintained by a stream processor. - * + * * <p> * This interface does not specify any query capabilities, which, of course, * would be query engine specific. Instead it just specifies the minimum @@ -39,7 +39,7 @@ public interface StorageEngine { * Restore the content of this StorageEngine from the changelog. Messages are * provided in one {@link java.util.Iterator} and not deserialized for * efficiency, allowing the implementation to optimize replay, if possible. - * + * * @param envelopes * An iterator of envelopes that the storage engine can read from to * restore its state on startup. http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index 90fdc19..deb69e1 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -203,7 +203,7 @@ class TaskStorageManager( } private def restoreStores() { - debug("Restoring stores.") + debug("Restoring stores for task: %s." format taskName.getTaskName) for ((storeName, store) <- taskStoresToRestore) { if (changeLogSystemStreams.contains(storeName)) { http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index 836dab4..b7baede 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -68,7 +68,7 @@ object RocksDbKeyValueStore extends Logging { try { val rocksDb = if (useTTL) { - info("Opening RocksDB store with TTL value: %s" format ttl) + info("Opening RocksDB store: %s in path: %s with TTL value: %s" format (storeName, dir.toString, ttl)) TtlDB.open(options, dir.toString, ttl.toInt, false) } else { RocksDB.open(options, dir.toString) http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala index 39136db..78a7b0b 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala @@ -100,16 +100,16 @@ class AccessLoggedStore[K, V]( } def close(): Unit = { - trace("Closing accessLogged store.") + trace("Closing accessLogged store: %s." format storeName) store.close } def flush(): Unit = { - trace("Flushing store.") + trace("Flushing store: %s." format storeName) store.flush - trace("Flushed store.") + trace("Flushed store: %s." format storeName) } http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala index d962e93..e1e7642 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala @@ -154,7 +154,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] } } - new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, rawStore, + new KeyValueStorageEngine(storeName, storeDir, storePropertiesBuilder.build(), nullSafeStore, rawStore, keyValueStorageEngineMetrics, batchSize, () => clock.nanoTime()) } http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala index 963dce4..0434199 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala @@ -19,6 +19,8 @@ package org.apache.samza.storage.kv +import java.io.File + import org.apache.samza.util.Logging import org.apache.samza.storage.{StorageEngine, StoreProperties} import org.apache.samza.system.IncomingMessageEnvelope @@ -32,6 +34,8 @@ import scala.collection.JavaConverters._ * This implements both the key/value interface and the storage engine interface. */ class KeyValueStorageEngine[K, V]( + storeName: String, + storeDir: File, storeProperties: StoreProperties, wrapperStore: KeyValueStore[K, V], rawStore: KeyValueStore[Array[Byte], Array[Byte]], @@ -104,7 +108,7 @@ class KeyValueStorageEngine[K, V]( * batching updates to underlying raw store to notAValidEvent wrapping functions for efficiency. */ def restore(envelopes: java.util.Iterator[IncomingMessageEnvelope]) { - info("Restoring entries for store " + metrics.storeName) + info("Restoring entries for store: " + storeName + " in directory: " + storeDir.toString) val batch = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](batchSize) @@ -132,11 +136,11 @@ class KeyValueStorageEngine[K, V]( count += 1 if (count % 1000000 == 0) { - info(count + " entries restored...") + info(count + " entries restored for store: " + storeName + " in directory: " + storeDir.toString + "...") } } - info(count + " total entries restored.") + info(count + " total entries restored for store: " + storeName + " in directory: " + storeDir.toString + ".") if (batch.size > 0) { doPutAll(rawStore, batch) http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala index f0c254f..8806a81 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala @@ -19,9 +19,11 @@ package org.apache.samza.storage.kv +import java.io.File import java.util.Arrays import org.apache.samza.Partition +import org.apache.samza.container.TaskName import org.apache.samza.storage.StoreProperties import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition} import org.junit.Assert._ @@ -37,9 +39,11 @@ class TestKeyValueStorageEngine { def setup() { val wrapperKv = new MockKeyValueStore() val rawKv = mock(classOf[KeyValueStore[Array[Byte], Array[Byte]]]) + val storeName = "test-storeName" + val storeDir = mock(classOf[File]) val properties = mock(classOf[StoreProperties]) metrics = new KeyValueStorageEngineMetrics - engine = new KeyValueStorageEngine[String, String](properties, wrapperKv, rawKv, metrics, clock = () => { getNextTimestamp() }) + engine = new KeyValueStorageEngine[String, String](storeName, storeDir, properties, wrapperKv, rawKv, metrics, clock = () => { getNextTimestamp() }) } @After