This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 8021074 Ensuring store consumers are started exactly once (#1094)
8021074 is described below
commit 80210748b24d4e44652af52973902dfcef3cb6cd
Author: rmatharu <[email protected]>
AuthorDate: Fri Jun 28 16:53:49 2019 -0700
Ensuring store consumers are started exactly once (#1094)
---
.../scala/org/apache/samza/storage/ContainerStorageManager.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 9cfa9ab..d3fa6ec 100644
---
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -629,8 +629,8 @@ public class ContainerStorageManager {
// initialize each TaskStorageManager
this.taskRestoreManagers.values().forEach(taskStorageManager ->
taskStorageManager.initialize());
- // Start store consumers
- this.storeConsumers.values().forEach(systemConsumer ->
systemConsumer.start());
+ // Start each store consumer once
+ this.storeConsumers.values().stream().distinct().forEach(systemConsumer ->
systemConsumer.start());
// Create a thread pool for parallel restores (and stopping of persistent
stores)
ExecutorService executorService =
Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize,
@@ -657,8 +657,8 @@ public class ContainerStorageManager {
executorService.shutdown();
- // Stop store consumers
- this.storeConsumers.values().forEach(systemConsumer ->
systemConsumer.stop());
+ // Stop each store consumer once
+ this.storeConsumers.values().stream().distinct().forEach(systemConsumer ->
systemConsumer.stop());
// Now re-create persistent stores in read-write mode, leave
non-persistent stores as-is
recreatePersistentTaskStoresInReadWriteMode(this.containerModel,
jobContext, containerContext,