[ 
https://issues.apache.org/jira/browse/SAMZA-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14546147#comment-14546147
 ] 

Yan Fang commented on SAMZA-676:
--------------------------------

{quote}
GroupByPartition now takes the Config as a constructor param and gets the 
broadcast streams from that. That change was easy since the 
GroupByPartitionFactory already has the config.
{quote}

Thank you [~twbecker] for pointing this out. I think Navina and I both 
overlooked this one. 

{quote}
This way we don't break the API as such, but more like we are now making the 
grouper more configurable (We can extend the current default grouper a new 
broadcast grouper or something similar - and we set the broadcast grouper to 
the default). The grouper fetches this config, and with this config, it assigns 
the same task to all the tasks. First one with partition 0 to all of them, and 
second one through four to all of them.
{quote}

[~naveenatceg], agreed on this idea. (guess +1 from [~nickpan47] too). As 
mentioned in the above comment, since the GrouperFactory already has the Config 
parameter, I don't think we need to change the Grouper API at all. Then we do 
not need to change the JobCoordinator either. The Grouper class takes care of 
assigning the broadcast stream to all the tasks.

{quote}
If a container has more than 1 task and you fetch a message from a broadcast 
stream partition, will you be invoking each of the tasks in order ? Just need a 
clarification.
{quote}

{quote}
I have another question. In a system which consumes from a broadcast stream, 
how will we calibrate the throughput of a job (messages processed per second) ? 
The same message is handled more than once in different tasks.
{quote}

Those two questions are related. Both need a further discussion. Because there 
are two ideas now (similar thing mentioned in SAMZA-353's design doc): 

1) "Alter the SamzaContainer to consume from multiple positions within a single 
SystemStreamPartition."(from SAMZA-353 design doc). An alternative is that, we 
treat the boardcast stream in different tasks as different SSPs. In Kafka, 
(from my experiment), one consumer can fetch the same topic from different 
offsets and treat them as two different FetchRequest. Just need to verify with 
[~guozhang].

2) "Impose an ordering on offsets, and always start consuming from the lowest 
offset that's required for all TaskNames within a
single container for a given SystemStreamPartition. The container can then 
filter any input messages that a given
StreamTask has already processed in cases where another task in the container 
might be farther behind." (from SAMZA-353 design doc).

Method 1 is more intuitive and doable. But as I mentioned, may need to change 
the Consumer API to let the consumer return the messages from the same SSP but 
different offsets. (the poll() method).

The good thing about method 2 is that it does need to change existing APIs. It 
has a few things to consider:
1) has the assumption that all offsets are ordered. I think this is not a big 
concern. When we have a system which does not have ordering offset, just do not 
support broadcast stream and does not influence other implementations.

2) Another concern is that, assume broadcast stream has offset 100 in task 1, 
and offset 50 in task 2. When we process, we start from offset 50 and have the 
task 1 ignores msgs from offset 50-100. How is the priority of the streams? Do 
we only process the broadstream from 50-100 first, or in a RoundRobin order 
with all other partitions? Do we prioritize task 2 and process it until the 
broadstream reaches offset 100, then we can do 
one-broadstream-msg-for-two-tasks logic?

{quote}
Instead of changing SystemConsumer API, why not change the implementation of 
SystemConsumers s.t. we allow M:1 mapping between SystemConsumer instance to 
Task instance? That should satisfy the multi-subscriber use case as well. 
SystemConsumers.register() method has to be changed but that is internal API.
{quote}

[~nickpan47], Do you mean SystemConsumer : Task instances = M : 1 ( currently 
SystemConsumer : Task instances = 1 : M )? Or do you mean we allow the consumer 
to register the same SSP from different tasks as different SSPs? If the latter 
one, yes, this is doable. We do not need to change the register() API. However, 
the problem is in the poll() method, which only returns the SSP-> List Map, 
does not have the information which task this SSP belongs.

{quote}
The MessageChooser.choose API only returns the IncomingMessageEnvelope, which 
makes it difficult to identify which task this message should go to w/o knowing 
from which consumer this message is coming from. 
{quote}

[~navina], this is what I mean by changing the Chooser API. I did not put it in 
the design doc is because I did not realize we need to do this as well. Will 
update it later. 

{quote}
The only option w/o breaking the MessageChooser API seems to require a hell-lot 
of bookkeeping in the SystemConsumers to remember the lastest offset each 
SystemConsumer is at and decorate/deliver the incoming message w/ the 
SystemConsumer that expects the exact offset. Yet, there could be multiple 
deliveries if all consumers are at the exact offset and moving at exact the 
same speed.
{quote}

I think this is related to your (1) comment - you are suggesting multiple 
consumers - this needs further discussion. But yes, you are right, the 
SystemConsumer needs to bookkeep the information of the incoming messages, such 
as latest offset/taskName, and currently I do not see any place to have this 
information. 

{quote}
So, I would propose the add a MulticastMessageChoose API and make it 
configurable s.t. it is required when broadcast topic is configured.
{quote}

What kind of new methods will this need?


> Implement Broadcast Stream
> --------------------------
>
>                 Key: SAMZA-676
>                 URL: https://issues.apache.org/jira/browse/SAMZA-676
>             Project: Samza
>          Issue Type: Improvement
>          Components: container
>            Reporter: Yan Fang
>            Assignee: Yan Fang
>         Attachments: BroadcastStreamDesign.md, BroadcastStreamDesign.pdf
>
>
> There are a lot of discussion in SAMZA-353 about assigning the same SSP to 
> multiple taskNames. This ticket is a subset of the discussion. Only focus on 
> the broadcast stream implementation. 
> The goal is to assign one SSP to all the taskNames. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to