mynameborat commented on a change in pull request #912: SEP-19 : Refactoring 
sideInputs from SamzaContainer to ContainerStorageManager
URL: https://github.com/apache/samza/pull/912#discussion_r258224159
 
 

 ##########
 File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 ##########
 @@ -112,15 +144,49 @@
 
   public ContainerStorageManager(ContainerModel containerModel, 
StreamMetadataCache streamMetadataCache,
       SystemAdmins systemAdmins, Map<String, SystemStream> 
changelogSystemStreams,
+      Map<String, List<SystemStream>> sideInputSystemStreams,
       Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
       Map<String, SystemFactory> systemFactories, Map<String, Serde<Object>> 
serdes, Config config,
       Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, 
SamzaContainerMetrics samzaContainerMetrics,
       JobContext jobContext, ContainerContext containerContext,
       Map<TaskName, TaskInstanceCollector> taskInstanceCollectors, File 
loggedStoreBaseDirectory,
-      File nonLoggedStoreBaseDirectory, int maxChangeLogStreamPartitions, 
Clock clock) {
+      File nonLoggedStoreBaseDirectory, int maxChangeLogStreamPartitions, 
SerdeManager serdeManager, Clock clock) {
 
     this.containerModel = containerModel;
-    this.changelogSystemStreams = changelogSystemStreams;
+    this.sideInputSystemStreams = new HashMap<>(sideInputSystemStreams);
+    this.sideInputSSPs = new HashMap<>();
+
+    // Add all side inputs to the map of sideInputSSPs indexed by taskName
+    containerModel.getTasks().forEach((taskName, taskModel) -> {
+        sideInputSystemStreams.keySet().forEach(storeName -> {
+            Set<SystemStreamPartition> taskSideInputSSPs = 
taskModel.getSystemStreamPartitions().stream().filter(ssp -> 
sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet());
+            this.sideInputSSPs.putIfAbsent(taskName, new HashMap<>());
+            this.sideInputSSPs.get(taskName).put(storeName, taskSideInputSSPs);
+          });
+      });
+
+    // Create a map of changeLogSSP to storeName across all tasks, assuming no 
stores have the same changelogSSP
 
 Review comment:
   nit: "assuming no two stores"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to