mynameborat commented on a change in pull request #912: SEP-19 : Refactoring 
sideInputs from SamzaContainer to ContainerStorageManager
URL: https://github.com/apache/samza/pull/912#discussion_r260974797
 
 

 ##########
 File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 ##########
 @@ -148,32 +205,114 @@ public ContainerStorageManager(ContainerModel 
containerModel, StreamMetadataCach
 
     this.maxChangeLogStreamPartitions = maxChangeLogStreamPartitions;
     this.streamMetadataCache = streamMetadataCache;
+    this.systemAdmins = systemAdmins;
 
     // create taskStores for all tasks in the containerModel and each store in 
storageEngineFactories
-    this.taskStores = createTaskStores(containerModel, jobContext, 
containerContext, storageEngineFactories, changelogSystemStreams,
-        serdes, taskInstanceMetrics, taskInstanceCollectors, 
StorageEngineFactory.StoreMode.BulkLoad);
+    this.taskStores = createTaskStores(containerModel, jobContext, 
containerContext, storageEngineFactories, serdes, taskInstanceMetrics, 
taskInstanceCollectors);
 
-    // create system consumers (1 per store system)
-    this.systemConsumers = createStoreConsumers(changelogSystemStreams, 
systemFactories, config, this.samzaContainerMetrics.registry());
+    // create system consumers (1 per store system in changelogSystemStreams), 
and index it by storeName
+    Map<String, SystemConsumer> storeSystemConsumers = 
createConsumers(this.changelogSystemStreams.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+        e -> Collections.singleton(e.getValue()))), systemFactories, config, 
this.samzaContainerMetrics.registry());
+    this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams, 
storeSystemConsumers);
 
     // creating task restore managers
     this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock);
+
+    // creating side input store processors, one per store per task
+    taskSideInputProcessors = createSideInputProcessors(new 
StorageConfig(config), this.containerModel, this.sideInputSystemStreams, 
this.taskInstanceMetrics);
 
 Review comment:
   Looks like this can be a local variable or done inside the 
`createSideInputStorageManagers()`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to