[ https://issues.apache.org/jira/browse/SAMZA-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14546147#comment-14546147 ]
Yan Fang commented on SAMZA-676: -------------------------------- {quote} GroupByPartition now takes the Config as a constructor param and gets the broadcast streams from that. That change was easy since the GroupByPartitionFactory already has the config. {quote} Thank you [~twbecker] for pointing this out. I think Navina and I both overlooked this one. {quote} This way we don't break the API as such, but more like we are now making the grouper more configurable (We can extend the current default grouper a new broadcast grouper or something similar - and we set the broadcast grouper to the default). The grouper fetches this config, and with this config, it assigns the same task to all the tasks. First one with partition 0 to all of them, and second one through four to all of them. {quote} [~naveenatceg], agreed on this idea. (guess +1 from [~nickpan47] too). As mentioned in the above comment, since the GrouperFactory already has the Config parameter, I don't think we need to change the Grouper API at all. Then we do not need to change the JobCoordinator either. The Grouper class takes care of assigning the broadcast stream to all the tasks. {quote} If a container has more than 1 task and you fetch a message from a broadcast stream partition, will you be invoking each of the tasks in order ? Just need a clarification. {quote} {quote} I have another question. In a system which consumes from a broadcast stream, how will we calibrate the throughput of a job (messages processed per second) ? The same message is handled more than once in different tasks. {quote} Those two questions are related. Both need a further discussion. Because there are two ideas now (similar thing mentioned in SAMZA-353's design doc): 1) "Alter the SamzaContainer to consume from multiple positions within a single SystemStreamPartition."(from SAMZA-353 design doc). An alternative is that, we treat the boardcast stream in different tasks as different SSPs. In Kafka, (from my experiment), one consumer can fetch the same topic from different offsets and treat them as two different FetchRequest. Just need to verify with [~guozhang]. 2) "Impose an ordering on offsets, and always start consuming from the lowest offset that's required for all TaskNames within a single container for a given SystemStreamPartition. The container can then filter any input messages that a given StreamTask has already processed in cases where another task in the container might be farther behind." (from SAMZA-353 design doc). Method 1 is more intuitive and doable. But as I mentioned, may need to change the Consumer API to let the consumer return the messages from the same SSP but different offsets. (the poll() method). The good thing about method 2 is that it does need to change existing APIs. It has a few things to consider: 1) has the assumption that all offsets are ordered. I think this is not a big concern. When we have a system which does not have ordering offset, just do not support broadcast stream and does not influence other implementations. 2) Another concern is that, assume broadcast stream has offset 100 in task 1, and offset 50 in task 2. When we process, we start from offset 50 and have the task 1 ignores msgs from offset 50-100. How is the priority of the streams? Do we only process the broadstream from 50-100 first, or in a RoundRobin order with all other partitions? Do we prioritize task 2 and process it until the broadstream reaches offset 100, then we can do one-broadstream-msg-for-two-tasks logic? {quote} Instead of changing SystemConsumer API, why not change the implementation of SystemConsumers s.t. we allow M:1 mapping between SystemConsumer instance to Task instance? That should satisfy the multi-subscriber use case as well. SystemConsumers.register() method has to be changed but that is internal API. {quote} [~nickpan47], Do you mean SystemConsumer : Task instances = M : 1 ( currently SystemConsumer : Task instances = 1 : M )? Or do you mean we allow the consumer to register the same SSP from different tasks as different SSPs? If the latter one, yes, this is doable. We do not need to change the register() API. However, the problem is in the poll() method, which only returns the SSP-> List Map, does not have the information which task this SSP belongs. {quote} The MessageChooser.choose API only returns the IncomingMessageEnvelope, which makes it difficult to identify which task this message should go to w/o knowing from which consumer this message is coming from. {quote} [~navina], this is what I mean by changing the Chooser API. I did not put it in the design doc is because I did not realize we need to do this as well. Will update it later. {quote} The only option w/o breaking the MessageChooser API seems to require a hell-lot of bookkeeping in the SystemConsumers to remember the lastest offset each SystemConsumer is at and decorate/deliver the incoming message w/ the SystemConsumer that expects the exact offset. Yet, there could be multiple deliveries if all consumers are at the exact offset and moving at exact the same speed. {quote} I think this is related to your (1) comment - you are suggesting multiple consumers - this needs further discussion. But yes, you are right, the SystemConsumer needs to bookkeep the information of the incoming messages, such as latest offset/taskName, and currently I do not see any place to have this information. {quote} So, I would propose the add a MulticastMessageChoose API and make it configurable s.t. it is required when broadcast topic is configured. {quote} What kind of new methods will this need? > Implement Broadcast Stream > -------------------------- > > Key: SAMZA-676 > URL: https://issues.apache.org/jira/browse/SAMZA-676 > Project: Samza > Issue Type: Improvement > Components: container > Reporter: Yan Fang > Assignee: Yan Fang > Attachments: BroadcastStreamDesign.md, BroadcastStreamDesign.pdf > > > There are a lot of discussion in SAMZA-353 about assigning the same SSP to > multiple taskNames. This ticket is a subset of the discussion. Only focus on > the broadcast stream implementation. > The goal is to assign one SSP to all the taskNames. -- This message was sent by Atlassian JIRA (v6.3.4#6332)