Hi, Don,

Thanks for the detailed design doc for a long-waited feature in Samza!
Really appreciate it! I did a quick pass and have the following comments:

- minor: "limit the maximum size of partition" ==> "limit the maximum size
of each partition"
- "However, Samza currently is not able to handle partition expansion of
the input streams"==>better point out "for stateful jobs". For stateless
jobs, simply bouncing the job now can pick up the new partitions.
- "it is possible (e.g. with Kafka) that messages with a given key exists
in both partition 1 an 3. Because GroupByPartition will assign partition 1
and 3 to different tasks, messages with the same key may be handled by
different task/container/process and their state will be stored in
different changelog partition." The problem statement is not super clear
here. The issues with stateful jobs is: after GroupByPartition assign
partition 1 and 3 to different tasks, the new task handling partition 3
does not have the previous state to resume the work. e.g. a page-key based
counter would start from 0 in the new task for a specific key, instead of
resuming the previous count 50 held by task 1.
- minor rewording: "the first solution in this doc" ==> "the solution
proposed in this doc"
- "Thus additional development work is needed in Kafka to meet this
requirement" It would be good to link to a KIP if and when it exists
- Instead of touching/deprecating the interface
SystemStreamPartitionGrouper, I would recommend to have a different
implementation class of the interface, which in the constructor of the
grouper, takes two parameters: a) the previous task number read from the
coordinator stream; b) the configured new-partition to old-partition
mapping policy. Then, the grouper's interface method stays the same and the
behavior of the grouper is more configurable which is good to support a
broader set of use cases in addition to Kafka's built-in partition
expansion policies.
- Minor renaming suggestion to the new grouper class names:
GroupByPartitionWithFixedTaskNum
and GroupBySystemStreamPartitionWithFixedTaskNum

Thanks!

- Yi

On Wed, May 31, 2017 at 10:33 AM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Navina,
>
> Thanks much for the comment. Please see my response below.
>
> Regarding your biggest gripe with the SEP, I personally think the
> operational requirement proposed in the KIP are pretty general and could be
> easily enforced by other systems. The reason is that the module operation
> is pretty standard and the default option when we choose partition. And
> usually the underlying system allows user to select arbitrary partition
> number if it supports partition expansion. Do you know any system that does
> not meet these two requirement?
>
> Regarding your comment of the Motivation section, I renamed the first
> section as "Problem and Goal" and specified that "*The goal of this
> proposal is to enable partition expansion of the input streams*.". I also
> put a sentence at the end of the Motivation section that "*The feature of
> task expansion is out of the scope of this proposal and will be addressed
> in a future SEP*". The second paragraph in the Motivation section is mainly
> used to explain the thinking process that we have gone through, what other
> alternative we have considered, and we plan to do in Samza in the nex step.
>
> To answer your question why increasing the partition number will increase
> the throughput of the kafka consumer in the container, Kafka consumer can
> potentially fetch more data in one FetchResponse with more partitions in
> the FetchRequest. This is because we limit the maximum amount of data that
> can be fetch for a given partition in the FetchResponse. This by default is
> set to 1 MB. And there is reason that we can not arbitrarily bump up this
> limit.
>
> To answer your question how partition expansion in Kafka impacts the
> clients, Kafka consumer is able to automatically detect new partition of
> the topic and reassign all (both old and new) partitions across consumers
> in the consumer group IF you tell consumer the topic to be subscribed. But
> consumer in Samza's container uses another way of subscription. Instead of
> subscribing to the topic, the consumer in Samza's container subscribes to
> the specific partitions of the topic. In this case, if new partitions have
> been added, Samza will need to explicitly subscribe to the new partitions
> of the topic. The "Handle partition expansion while tasks are running"
> section in the SEP addresses this issue in Samza -- it recalculates the job
> model and restart container so that consumer can subscribe to the new
> partitions.
>
> I will ask other dev to take a look at the proposal. I will start the
> voting thread tomorrow if there is no further concern with the SEP.
>
> Thanks!
> Dong
>
>
> On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh (Apache) <
> nav...@apache.org>
> wrote:
>
> > Hey Dong,
> >
> > >  I have updated the motivation section to clarify this.
> >
> > Thanks for updating the motivation. Couple of notes here:
> >
> > 1.
> > > "The motivation of increasing partition number of Kafka topic includes
> 1)
> > limit the maximum size of a partition in order to improve broker
> > performance and 2) increase throughput of Kafka consumer in the Samza
> > container."
> >
> > It's unclear to me how increasing the partition number will increase the
> > throughput of the kafka consumer in the container? Theoretically, you
> will
> > still be consuming the same amount of data in the container, irrespective
> > of whether it is coming from one partition or more than one expanded
> > partitions. Can you please explain it for me here, what you mean by that?
> >
> > 2. I believe the second paragraph under motivation is simply talking
> about
> > the scope of the current SEP. It will be easier to read if what solution
> is
> > included in this SEP and what is left out as not in scope. (for example,
> > expansions for stateful jobs is supported or not).
> >
> > > We need to persist the task-to-sspList mapping in the
> > coordinator stream so that the job can derive the original number of
> > partitions of each input stream regardless of how many times the
> partition
> > has expanded. Does this make sense?
> >
> > Yes. It does!
> >
> > > I am not sure how this is related to the locality though. Can you
> clarify
> > your question if I haven't answered your question?
> >
> > It's not related. I just meant to give an example of yet another
> > coordinator message that is persisted. Your ssp-to-task mapping is
> > following a similar pattern for persisting. Just wanted to clarify that.
> >
> > > Can you let me know if this, together with the answers in the previous
> > email, addresses all your questions?
> >
> > Yes. I believe you have addressed most of my questions. Thanks for taking
> > time to do that.
> >
> > > Is there specific question you have regarding partition
> > expansion in Kafka?
> >
> > I guess my questions are on how partition expansion in Kafka impacts the
> > clients. Iiuc, partition expansions are done manually in Kafka based on
> the
> > bytes-in rate of the partition. Do the existing kafka clients handle this
> > expansion automatically? if yes, how does it work? If not, are there
> plans
> > to support it in the future?
> >
> > > Thus user's job should not need to bootstrap key/value store from the
> > changelog topic.
> >
> > Why is this discussion relevant here? Key/value store / changelog topic
> > partition is scoped with the context of a task. Since we are not changing
> > the number of tasks, I don't think it is required to mention it here.
> >
> > > The new method takes the SystemStreamPartition-to-Task assignment from
> > the previous job model which can be read from the coordinator stream.
> >
> > Jobmodel is currently not persisted to coordinator stream. In your
> design,
> > you talk about writing separate coordinator messages for ssp-to-task
> > assignments. Hence, please correct this statement. It is kind of
> misleading
> > to the reader.
> >
> > My biggest gripe with this SEP is that it seems like a tailor-made
> solution
> > that relies on the semantics of the Kafka system and yet, we are trying
> to
> > masquerade that as operational requirements for other systems interacting
> > with Samza. (Not to say that this is the first time such a choice is
> being
> > made in the Samza design). I am not seeing how this can a "general"
> > solution for all input systems. That's my two cents. I would like to hear
> > alternative points of view for this from other devs.
> >
> > Please make sure you have enough eyes on this SEP. If you do, please
> start
> > a VOTE thread to approve this SEP.
> >
> > Thanks!
> > Navina
> >
> >
> > On Mon, May 29, 2017 at 12:32 AM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Navina,
> > >
> > > I have updated the wiki based on your suggestion. More specifically, I
> > have
> > > made the following changes:
> > >
> > > - Improved Problem section and Motivation section to describe why we
> use
> > > the solution in this proposal instead of tackling the problem of task
> > > expansion directly.
> > >
> > > - Illustrate the design in a way that doesn't bind to Kafka. Kafka is
> > only
> > > used as example to illustrate why we want to expand partition expansion
> > and
> > > whether the operational requirement can be supported when Kafka is used
> > as
> > > the input system. Note that the proposed solution should work for any
> > input
> > > system that meets the operational requirement described in the wiki.
> > >
> > > - Fixed the problem in the figure.
> > >
> > > - Added a new class GroupBySystemStreamPartitionFixedTaskNum to the
> > wiki.
> > > Together with GroupByPartitionFixedTaskNum, it should ensure that we
> > have a
> > > solution to enable partition expansion for all users that are using
> > > pre-defined grouper in Samza. Note that those users who use custom
> > grouper
> > > would need to update their implementation.
> > >
> > > Can you let me know if this, together with the answers in the previous
> > > email, addresses all your questions? Thanks for taking time to review
> the
> > > proposal.
> > >
> > > Regards,
> > > Dong
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > >
> > > > Hey Navina,
> > > >
> > > > Thanks much for your comments. Please see my reply inline.
> > > >
> > > > On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh (Apache) <
> > > > nav...@apache.org> wrote:
> > > >
> > > >> Thanks for the SEP, Dong. I have a couple of questions to understand
> > > your
> > > >> proposal better:
> > > >>
> > > >> * Under motivation, you mention that "_We expect this solution to
> work
> > > >> similarly with other input system as well._", yet I don't see any
> > > >> discussion on how it will work with other input systems. That is,
> what
> > > >> kind
> > > >> of contract does samza expect from other input systems ? If we are
> not
> > > >> planning to provide a generic solution, it might be worth calling it
> > out
> > > >> in
> > > >> the SEP.
> > > >>
> > > >
> > > > I think the contract we expect from other systems are exactly the
> > > > operational requirement mentioned in the SEP, i.e. partitions should
> > > always
> > > > be doubled and the hash algorithm should module the number of
> > partitions.
> > > > SEP-5 should also allow partition expansion of all input systems that
> > > meet
> > > > these two requirements. I have updated the motivation section to
> > clarify
> > > > this.
> > > >
> > > >
> > > >>
> > > >> * I understand the partition mapping logic you have proposed. But I
> > > think
> > > >> the example explanation doesn't match the diagram. In the diagram,
> > after
> > > >> expansion, partiion-0 and partition-1 are pointing to bucket 0 and
> > > >> partition-3 and partition-4 are pointing to bucket 1. I think the
> > former
> > > >> has to be partition-0 and partition-2 and the latter, is partition-1
> > and
> > > >> partition-3. If I am wrong, please help me understand the logic :)
> > > >>
> > > >
> > > > Good catch. I will update the figure to fix this problem.
> > > >
> > > >
> > > >>
> > > >> * I don't know how partition expansion in Kafka works. I am familiar
> > > with
> > > >> how shard splitting happens in Kinesis - there is hierarchical
> > relation
> > > >> between the parent and child shards. This way, it will also allow
> the
> > > >> shards to be merged back. Iiuc, Kafka only supports partition
> > > "expansion",
> > > >> as opposed to "splits". Can you provide some context or link related
> > to
> > > >> how
> > > >> partition expansion works in Kafka?
> > > >>
> > > >
> > > > I couldn't find any wiki on partition expansion in Kafka. The
> partition
> > > > expansion logic in Kafka is very simply -- it simply adds new
> partition
> > > to
> > > > the existing topic. Is there specific question you have regarding
> > > partition
> > > > expansion in Kafka?
> > > >
> > > >
> > > >>
> > > >> * Are you only recommending that expansion can be supported for
> samza
> > > jobs
> > > >> that use Kafka as input systems **and** configure the SSPGrouper as
> > > >> GroupByPartitionFixedTaskNum? Sounds to me like this only applies
> for
> > > >> GroupByPartition. Please correct me if I am wrong. What is the
> > > expectation
> > > >> for custom SSP Groupers?
> > > >>
> > > >
> > > > The expansion can be supported for Samza jobs if the input system
> meets
> > > > the operational requirement mentioned above. It doesn't have to use
> > Kafka
> > > > as input system.
> > > >
> > > > The current proposal provided solution for jobs that currently use
> > > > GroupByPartition. The proposal can be extended to support jobs that
> use
> > > > other grouper that are pre-defined in Samza. The custom SSP grouper
> > needs
> > > > to handle partition expansion similar to how
> > GroupByPartitionFixedTaskNum
> > > > handles it and it is users' responsibility to update their custom
> > grouper
> > > > implementation.
> > > >
> > > >
> > > >>
> > > >> * Regarding storing SSP-to-Task assignment to coordinator stream:
> > Today,
> > > >> the JobModel encapsulates the data model in samza which also
> includes
> > > >> **TaskModels**. TaskModel, typically shows the task-to-sspList
> > mapping.
> > > >> What is the reason for using a separate coordinator stream message
> > > >> *SetSSPTaskMapping*? Is it because the JobModel itself is not
> > persisted
> > > in
> > > >> the coordinator stream today?  The reason locality exists outside of
> > the
> > > >> jobmodel is because *locality* information is written by each
> > container,
> > > >> where as it is consumed only by the leader jobcoordinator/AM. In
> this
> > > >> case,
> > > >> the writer of the mapping information and the reader is still the
> > leader
> > > >> jobcoordinator/AM. So, I want to understand the motivation for this
> > > >> choice.
> > > >>
> > > >
> > > > Yes, the reason for using a separate coordinate stream message is
> > because
> > > > the task-to-sspList mapping is not currently persisted in the
> > coordinator
> > > > stream. We wouldn't need to create this new stream message if
> JobModel
> > is
> > > > persisted. We need to persist the task-to-sspList mapping in the
> > > > coordinator stream so that the job can derive the original number of
> > > > partitions of each input stream regardless of how many times the
> > > partition
> > > > has expanded. Does this make sense?
> > > >
> > > > I am not sure how this is related to the locality though. Can you
> > clarify
> > > > your question if I haven't answered your question?
> > > >
> > > > Thanks!
> > > > Dong
> > > >
> > > >
> > > >>
> > > >> Cheers!
> > > >> Navina
> > > >>
> > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > >>
> > > >> > Hi all,
> > > >> >
> > > >> > We created SEP-5: Enable partition expansion of input streams.
> > Please
> > > >> find
> > > >> > the SEP wiki in the link
> > > >> > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > >> > 5%3A+Enable+partition+expansion+of+input+streams
> > > >> > .
> > > >> >
> > > >> > You feedback is appreciated!
> > > >> >
> > > >> > Thanks,
> > > >> > Dong
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to