mynameborat commented on a change in pull request #912: SEP-19 : Refactoring
sideInputs from SamzaContainer to ContainerStorageManager
URL: https://github.com/apache/samza/pull/912#discussion_r259945715
##########
File path:
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
##########
@@ -174,14 +174,25 @@ object SamzaContainer extends Logging {
.flatMap(_.getSystemStreamPartitions.asScala)
.toSet
+ val sideInputStoresToSystemStreams = config.getStoreNames
+ .map { storeName => (storeName, config.getSideInputs(storeName)) }
+ .filter { case (storeName, sideInputs) => sideInputs.nonEmpty }
+ .map { case (storeName, sideInputs) => (storeName,
sideInputs.map(StreamUtil.getSystemStreamFromNameOrId(config, _))) }
+ .toMap
+
+ val sideInputSystemStreams =
sideInputStoresToSystemStreams.values.flatMap(sideInputs =>
sideInputs.toStream).toSet
+
+ info("Got side input store system streams: %s" format
sideInputSystemStreams)
+
val inputSystemStreams = inputSystemStreamPartitions
Review comment:
Ideally, input SSPs should no longer have side inputs since its not using
run loop anymore.
You can either clean up the side inputs wiring in high level in this PR or
as a follow up PR. Also, we need to update the configuration docs to reflect
this new behavior where users no longer need to set task.inputs for low level
application.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services