mynameborat commented on a change in pull request #912: SEP-19 : Refactoring 
sideInputs from SamzaContainer to ContainerStorageManager
URL: https://github.com/apache/samza/pull/912#discussion_r259945715
 
 

 ##########
 File path: 
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
 ##########
 @@ -174,14 +174,25 @@ object SamzaContainer extends Logging {
       .flatMap(_.getSystemStreamPartitions.asScala)
       .toSet
 
+    val sideInputStoresToSystemStreams = config.getStoreNames
+      .map { storeName => (storeName, config.getSideInputs(storeName)) }
+      .filter { case (storeName, sideInputs) => sideInputs.nonEmpty }
+      .map { case (storeName, sideInputs) => (storeName, 
sideInputs.map(StreamUtil.getSystemStreamFromNameOrId(config, _))) }
+      .toMap
+
+    val sideInputSystemStreams = 
sideInputStoresToSystemStreams.values.flatMap(sideInputs => 
sideInputs.toStream).toSet
+
+    info("Got side input store system streams: %s" format 
sideInputSystemStreams)
+
     val inputSystemStreams = inputSystemStreamPartitions
 
 Review comment:
   Ideally, input SSPs should no longer have side inputs since its not using 
run loop anymore. 
   You can either clean up the side inputs wiring in high level in this PR or 
as a follow up PR. Also, we need to update the configuration docs to reflect 
this new behavior where users no longer need to set task.inputs for low level 
application. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to