This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 8021074  Ensuring store consumers are started exactly once (#1094)
8021074 is described below

commit 80210748b24d4e44652af52973902dfcef3cb6cd
Author: rmatharu <[email protected]>
AuthorDate: Fri Jun 28 16:53:49 2019 -0700

    Ensuring store consumers are started exactly once (#1094)
---
 .../scala/org/apache/samza/storage/ContainerStorageManager.java   | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 9cfa9ab..d3fa6ec 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -629,8 +629,8 @@ public class ContainerStorageManager {
     // initialize each TaskStorageManager
     this.taskRestoreManagers.values().forEach(taskStorageManager -> 
taskStorageManager.initialize());
 
-    // Start store consumers
-    this.storeConsumers.values().forEach(systemConsumer -> 
systemConsumer.start());
+    // Start each store consumer once
+    this.storeConsumers.values().stream().distinct().forEach(systemConsumer -> 
systemConsumer.start());
 
     // Create a thread pool for parallel restores (and stopping of persistent 
stores)
     ExecutorService executorService = 
Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize,
@@ -657,8 +657,8 @@ public class ContainerStorageManager {
 
     executorService.shutdown();
 
-    // Stop store consumers
-    this.storeConsumers.values().forEach(systemConsumer -> 
systemConsumer.stop());
+    // Stop each store consumer once
+    this.storeConsumers.values().stream().distinct().forEach(systemConsumer -> 
systemConsumer.stop());
 
     // Now re-create persistent stores in read-write mode, leave 
non-persistent stores as-is
     recreatePersistentTaskStoresInReadWriteMode(this.containerModel, 
jobContext, containerContext,

Reply via email to