Hi, Don, Thanks for the detailed design doc for a long-waited feature in Samza! Really appreciate it! I did a quick pass and have the following comments:
- minor: "limit the maximum size of partition" ==> "limit the maximum size of each partition" - "However, Samza currently is not able to handle partition expansion of the input streams"==>better point out "for stateful jobs". For stateless jobs, simply bouncing the job now can pick up the new partitions. - "it is possible (e.g. with Kafka) that messages with a given key exists in both partition 1 an 3. Because GroupByPartition will assign partition 1 and 3 to different tasks, messages with the same key may be handled by different task/container/process and their state will be stored in different changelog partition." The problem statement is not super clear here. The issues with stateful jobs is: after GroupByPartition assign partition 1 and 3 to different tasks, the new task handling partition 3 does not have the previous state to resume the work. e.g. a page-key based counter would start from 0 in the new task for a specific key, instead of resuming the previous count 50 held by task 1. - minor rewording: "the first solution in this doc" ==> "the solution proposed in this doc" - "Thus additional development work is needed in Kafka to meet this requirement" It would be good to link to a KIP if and when it exists - Instead of touching/deprecating the interface SystemStreamPartitionGrouper, I would recommend to have a different implementation class of the interface, which in the constructor of the grouper, takes two parameters: a) the previous task number read from the coordinator stream; b) the configured new-partition to old-partition mapping policy. Then, the grouper's interface method stays the same and the behavior of the grouper is more configurable which is good to support a broader set of use cases in addition to Kafka's built-in partition expansion policies. - Minor renaming suggestion to the new grouper class names: GroupByPartitionWithFixedTaskNum and GroupBySystemStreamPartitionWithFixedTaskNum Thanks! - Yi On Wed, May 31, 2017 at 10:33 AM, Dong Lin <lindon...@gmail.com> wrote: > Hey Navina, > > Thanks much for the comment. Please see my response below. > > Regarding your biggest gripe with the SEP, I personally think the > operational requirement proposed in the KIP are pretty general and could be > easily enforced by other systems. The reason is that the module operation > is pretty standard and the default option when we choose partition. And > usually the underlying system allows user to select arbitrary partition > number if it supports partition expansion. Do you know any system that does > not meet these two requirement? > > Regarding your comment of the Motivation section, I renamed the first > section as "Problem and Goal" and specified that "*The goal of this > proposal is to enable partition expansion of the input streams*.". I also > put a sentence at the end of the Motivation section that "*The feature of > task expansion is out of the scope of this proposal and will be addressed > in a future SEP*". The second paragraph in the Motivation section is mainly > used to explain the thinking process that we have gone through, what other > alternative we have considered, and we plan to do in Samza in the nex step. > > To answer your question why increasing the partition number will increase > the throughput of the kafka consumer in the container, Kafka consumer can > potentially fetch more data in one FetchResponse with more partitions in > the FetchRequest. This is because we limit the maximum amount of data that > can be fetch for a given partition in the FetchResponse. This by default is > set to 1 MB. And there is reason that we can not arbitrarily bump up this > limit. > > To answer your question how partition expansion in Kafka impacts the > clients, Kafka consumer is able to automatically detect new partition of > the topic and reassign all (both old and new) partitions across consumers > in the consumer group IF you tell consumer the topic to be subscribed. But > consumer in Samza's container uses another way of subscription. Instead of > subscribing to the topic, the consumer in Samza's container subscribes to > the specific partitions of the topic. In this case, if new partitions have > been added, Samza will need to explicitly subscribe to the new partitions > of the topic. The "Handle partition expansion while tasks are running" > section in the SEP addresses this issue in Samza -- it recalculates the job > model and restart container so that consumer can subscribe to the new > partitions. > > I will ask other dev to take a look at the proposal. I will start the > voting thread tomorrow if there is no further concern with the SEP. > > Thanks! > Dong > > > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh (Apache) < > nav...@apache.org> > wrote: > > > Hey Dong, > > > > > I have updated the motivation section to clarify this. > > > > Thanks for updating the motivation. Couple of notes here: > > > > 1. > > > "The motivation of increasing partition number of Kafka topic includes > 1) > > limit the maximum size of a partition in order to improve broker > > performance and 2) increase throughput of Kafka consumer in the Samza > > container." > > > > It's unclear to me how increasing the partition number will increase the > > throughput of the kafka consumer in the container? Theoretically, you > will > > still be consuming the same amount of data in the container, irrespective > > of whether it is coming from one partition or more than one expanded > > partitions. Can you please explain it for me here, what you mean by that? > > > > 2. I believe the second paragraph under motivation is simply talking > about > > the scope of the current SEP. It will be easier to read if what solution > is > > included in this SEP and what is left out as not in scope. (for example, > > expansions for stateful jobs is supported or not). > > > > > We need to persist the task-to-sspList mapping in the > > coordinator stream so that the job can derive the original number of > > partitions of each input stream regardless of how many times the > partition > > has expanded. Does this make sense? > > > > Yes. It does! > > > > > I am not sure how this is related to the locality though. Can you > clarify > > your question if I haven't answered your question? > > > > It's not related. I just meant to give an example of yet another > > coordinator message that is persisted. Your ssp-to-task mapping is > > following a similar pattern for persisting. Just wanted to clarify that. > > > > > Can you let me know if this, together with the answers in the previous > > email, addresses all your questions? > > > > Yes. I believe you have addressed most of my questions. Thanks for taking > > time to do that. > > > > > Is there specific question you have regarding partition > > expansion in Kafka? > > > > I guess my questions are on how partition expansion in Kafka impacts the > > clients. Iiuc, partition expansions are done manually in Kafka based on > the > > bytes-in rate of the partition. Do the existing kafka clients handle this > > expansion automatically? if yes, how does it work? If not, are there > plans > > to support it in the future? > > > > > Thus user's job should not need to bootstrap key/value store from the > > changelog topic. > > > > Why is this discussion relevant here? Key/value store / changelog topic > > partition is scoped with the context of a task. Since we are not changing > > the number of tasks, I don't think it is required to mention it here. > > > > > The new method takes the SystemStreamPartition-to-Task assignment from > > the previous job model which can be read from the coordinator stream. > > > > Jobmodel is currently not persisted to coordinator stream. In your > design, > > you talk about writing separate coordinator messages for ssp-to-task > > assignments. Hence, please correct this statement. It is kind of > misleading > > to the reader. > > > > My biggest gripe with this SEP is that it seems like a tailor-made > solution > > that relies on the semantics of the Kafka system and yet, we are trying > to > > masquerade that as operational requirements for other systems interacting > > with Samza. (Not to say that this is the first time such a choice is > being > > made in the Samza design). I am not seeing how this can a "general" > > solution for all input systems. That's my two cents. I would like to hear > > alternative points of view for this from other devs. > > > > Please make sure you have enough eyes on this SEP. If you do, please > start > > a VOTE thread to approve this SEP. > > > > Thanks! > > Navina > > > > > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin <lindon...@gmail.com> wrote: > > > > > Hey Navina, > > > > > > I have updated the wiki based on your suggestion. More specifically, I > > have > > > made the following changes: > > > > > > - Improved Problem section and Motivation section to describe why we > use > > > the solution in this proposal instead of tackling the problem of task > > > expansion directly. > > > > > > - Illustrate the design in a way that doesn't bind to Kafka. Kafka is > > only > > > used as example to illustrate why we want to expand partition expansion > > and > > > whether the operational requirement can be supported when Kafka is used > > as > > > the input system. Note that the proposed solution should work for any > > input > > > system that meets the operational requirement described in the wiki. > > > > > > - Fixed the problem in the figure. > > > > > > - Added a new class GroupBySystemStreamPartitionFixedTaskNum to the > > wiki. > > > Together with GroupByPartitionFixedTaskNum, it should ensure that we > > have a > > > solution to enable partition expansion for all users that are using > > > pre-defined grouper in Samza. Note that those users who use custom > > grouper > > > would need to update their implementation. > > > > > > Can you let me know if this, together with the answers in the previous > > > email, addresses all your questions? Thanks for taking time to review > the > > > proposal. > > > > > > Regards, > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin <lindon...@gmail.com> > wrote: > > > > > > > Hey Navina, > > > > > > > > Thanks much for your comments. Please see my reply inline. > > > > > > > > On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh (Apache) < > > > > nav...@apache.org> wrote: > > > > > > > >> Thanks for the SEP, Dong. I have a couple of questions to understand > > > your > > > >> proposal better: > > > >> > > > >> * Under motivation, you mention that "_We expect this solution to > work > > > >> similarly with other input system as well._", yet I don't see any > > > >> discussion on how it will work with other input systems. That is, > what > > > >> kind > > > >> of contract does samza expect from other input systems ? If we are > not > > > >> planning to provide a generic solution, it might be worth calling it > > out > > > >> in > > > >> the SEP. > > > >> > > > > > > > > I think the contract we expect from other systems are exactly the > > > > operational requirement mentioned in the SEP, i.e. partitions should > > > always > > > > be doubled and the hash algorithm should module the number of > > partitions. > > > > SEP-5 should also allow partition expansion of all input systems that > > > meet > > > > these two requirements. I have updated the motivation section to > > clarify > > > > this. > > > > > > > > > > > >> > > > >> * I understand the partition mapping logic you have proposed. But I > > > think > > > >> the example explanation doesn't match the diagram. In the diagram, > > after > > > >> expansion, partiion-0 and partition-1 are pointing to bucket 0 and > > > >> partition-3 and partition-4 are pointing to bucket 1. I think the > > former > > > >> has to be partition-0 and partition-2 and the latter, is partition-1 > > and > > > >> partition-3. If I am wrong, please help me understand the logic :) > > > >> > > > > > > > > Good catch. I will update the figure to fix this problem. > > > > > > > > > > > >> > > > >> * I don't know how partition expansion in Kafka works. I am familiar > > > with > > > >> how shard splitting happens in Kinesis - there is hierarchical > > relation > > > >> between the parent and child shards. This way, it will also allow > the > > > >> shards to be merged back. Iiuc, Kafka only supports partition > > > "expansion", > > > >> as opposed to "splits". Can you provide some context or link related > > to > > > >> how > > > >> partition expansion works in Kafka? > > > >> > > > > > > > > I couldn't find any wiki on partition expansion in Kafka. The > partition > > > > expansion logic in Kafka is very simply -- it simply adds new > partition > > > to > > > > the existing topic. Is there specific question you have regarding > > > partition > > > > expansion in Kafka? > > > > > > > > > > > >> > > > >> * Are you only recommending that expansion can be supported for > samza > > > jobs > > > >> that use Kafka as input systems **and** configure the SSPGrouper as > > > >> GroupByPartitionFixedTaskNum? Sounds to me like this only applies > for > > > >> GroupByPartition. Please correct me if I am wrong. What is the > > > expectation > > > >> for custom SSP Groupers? > > > >> > > > > > > > > The expansion can be supported for Samza jobs if the input system > meets > > > > the operational requirement mentioned above. It doesn't have to use > > Kafka > > > > as input system. > > > > > > > > The current proposal provided solution for jobs that currently use > > > > GroupByPartition. The proposal can be extended to support jobs that > use > > > > other grouper that are pre-defined in Samza. The custom SSP grouper > > needs > > > > to handle partition expansion similar to how > > GroupByPartitionFixedTaskNum > > > > handles it and it is users' responsibility to update their custom > > grouper > > > > implementation. > > > > > > > > > > > >> > > > >> * Regarding storing SSP-to-Task assignment to coordinator stream: > > Today, > > > >> the JobModel encapsulates the data model in samza which also > includes > > > >> **TaskModels**. TaskModel, typically shows the task-to-sspList > > mapping. > > > >> What is the reason for using a separate coordinator stream message > > > >> *SetSSPTaskMapping*? Is it because the JobModel itself is not > > persisted > > > in > > > >> the coordinator stream today? The reason locality exists outside of > > the > > > >> jobmodel is because *locality* information is written by each > > container, > > > >> where as it is consumed only by the leader jobcoordinator/AM. In > this > > > >> case, > > > >> the writer of the mapping information and the reader is still the > > leader > > > >> jobcoordinator/AM. So, I want to understand the motivation for this > > > >> choice. > > > >> > > > > > > > > Yes, the reason for using a separate coordinate stream message is > > because > > > > the task-to-sspList mapping is not currently persisted in the > > coordinator > > > > stream. We wouldn't need to create this new stream message if > JobModel > > is > > > > persisted. We need to persist the task-to-sspList mapping in the > > > > coordinator stream so that the job can derive the original number of > > > > partitions of each input stream regardless of how many times the > > > partition > > > > has expanded. Does this make sense? > > > > > > > > I am not sure how this is related to the locality though. Can you > > clarify > > > > your question if I haven't answered your question? > > > > > > > > Thanks! > > > > Dong > > > > > > > > > > > >> > > > >> Cheers! > > > >> Navina > > > >> > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin <lindon...@gmail.com> > > wrote: > > > >> > > > >> > Hi all, > > > >> > > > > >> > We created SEP-5: Enable partition expansion of input streams. > > Please > > > >> find > > > >> > the SEP wiki in the link > > > >> > https://cwiki.apache.org/confluence/display/SAMZA/SEP- > > > >> > 5%3A+Enable+partition+expansion+of+input+streams > > > >> > . > > > >> > > > > >> > You feedback is appreciated! > > > >> > > > > >> > Thanks, > > > >> > Dong > > > >> > > > > >> > > > > > > > > > > > > > >