mynameborat commented on a change in pull request #912: SEP-19 : Refactoring sideInputs from SamzaContainer to ContainerStorageManager URL: https://github.com/apache/samza/pull/912#discussion_r258224159
########## File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java ########## @@ -112,15 +144,49 @@ public ContainerStorageManager(ContainerModel containerModel, StreamMetadataCache streamMetadataCache, SystemAdmins systemAdmins, Map<String, SystemStream> changelogSystemStreams, + Map<String, List<SystemStream>> sideInputSystemStreams, Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories, Map<String, SystemFactory> systemFactories, Map<String, Serde<Object>> serdes, Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, SamzaContainerMetrics samzaContainerMetrics, JobContext jobContext, ContainerContext containerContext, Map<TaskName, TaskInstanceCollector> taskInstanceCollectors, File loggedStoreBaseDirectory, - File nonLoggedStoreBaseDirectory, int maxChangeLogStreamPartitions, Clock clock) { + File nonLoggedStoreBaseDirectory, int maxChangeLogStreamPartitions, SerdeManager serdeManager, Clock clock) { this.containerModel = containerModel; - this.changelogSystemStreams = changelogSystemStreams; + this.sideInputSystemStreams = new HashMap<>(sideInputSystemStreams); + this.sideInputSSPs = new HashMap<>(); + + // Add all side inputs to the map of sideInputSSPs indexed by taskName + containerModel.getTasks().forEach((taskName, taskModel) -> { + sideInputSystemStreams.keySet().forEach(storeName -> { + Set<SystemStreamPartition> taskSideInputSSPs = taskModel.getSystemStreamPartitions().stream().filter(ssp -> sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet()); + this.sideInputSSPs.putIfAbsent(taskName, new HashMap<>()); + this.sideInputSSPs.get(taskName).put(storeName, taskSideInputSSPs); + }); + }); + + // Create a map of changeLogSSP to storeName across all tasks, assuming no stores have the same changelogSSP Review comment: nit: "assuming no two stores" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services